diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 738de1af..6a7dbeaf 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -2612,6 +2612,37 @@ generate_multiple_objects(Count, KeyNumber, ObjL) -> ObjL ++ [{Key, Value, IndexSpec}]). +shutdown_test_() -> + {timeout, 10, fun shutdown_tester/0}. + +shutdown_tester() -> + RootPath = reset_filestructure(), + {ok, Bookie1} = book_start([{root_path, RootPath}]), + lists:foreach( + fun({K, V, S}) -> + ok = book_put(Bookie1, <<"Bucket">>, K, V, S, ?STD_TAG) + end, + generate_multiple_objects(5000, 1)), + {ok, SnpPCL1, SnpJrnl1} = + leveled_bookie:book_snapshot(Bookie1, store, undefined, true), + + TestPid = self(), + spawn( + fun() -> + ok = leveled_bookie:book_close(Bookie1), + TestPid ! ok + end), + + timer:sleep(2000), + ok = leveled_penciller:pcl_close(SnpPCL1), + ok = leveled_inker:ink_close(SnpJrnl1), + SW = os:timestamp(), + receive ok -> ok end, + WaitForShutDown = timer:now_diff(SW, os:timestamp()) div 1000, + ?assert(WaitForShutDown =< (1000 + 1)), + _ = reset_filestructure(). + + ttl_test() -> RootPath = reset_filestructure(), {ok, Bookie1} = book_start([{root_path, RootPath}]), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index f516c650..2b4cc8d4 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -134,6 +134,7 @@ -define(JOURNAL_FILEX, "cdb"). -define(PENDING_FILEX, "pnd"). -define(TEST_KC, {[], infinity}). +-define(SHUTDOWN_LOOPS, 10). -define(SHUTDOWN_PAUSE, 10000). % How long to wait for snapshots to be released on shutdown % before forcing closure of snapshots @@ -155,7 +156,8 @@ compression_method = native :: lz4|native|none, compress_on_receipt = false :: boolean(), snap_timeout :: pos_integer() | undefined, % in seconds - source_inker :: pid() | undefined}). + source_inker :: pid() | undefined, + shutdown_loops = ?SHUTDOWN_LOOPS :: non_neg_integer()}). -type inker_options() :: #inker_options{}. @@ -798,16 +800,25 @@ handle_cast({remove_logs, ForcedLogs}, State) -> handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) -> case length(State#state.registered_snapshots) of 0 -> - ok; + gen_server:cast(self(), {complete_shutdown, ShutdownType, From}), + {noreply, State}; N -> % Whilst this process sleeps, then any remaining snapshots may % release and have their release messages queued before the % complete_shutdown cast is sent - leveled_log:log(i0026, [N]), - timer:sleep(?SHUTDOWN_PAUSE) - end, - gen_server:cast(self(), {complete_shutdown, ShutdownType, From}), - {noreply, State}; + case State#state.shutdown_loops of + LoopCount when LoopCount > 0 -> + leveled_log:log(i0026, [N]), + timer:sleep(?SHUTDOWN_PAUSE div ?SHUTDOWN_LOOPS), + gen_server:cast( + self(), {maybe_defer_shutdown, ShutdownType, From}), + {noreply, State#state{shutdown_loops = LoopCount - 1}}; + 0 -> + gen_server:cast( + self(), {complete_shutdown, ShutdownType, From}), + {noreply, State} + end + end; handle_cast({complete_shutdown, ShutdownType, From}, State) -> lists:foreach( fun(SnapPid) -> ok = ink_snapclose(SnapPid) end, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 3a78260a..164e3361 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -221,6 +221,7 @@ -define(ITERATOR_SCANWIDTH, 4). -define(TIMING_SAMPLECOUNTDOWN, 10000). -define(TIMING_SAMPLESIZE, 100). +-define(SHUTDOWN_LOOPS, 10). -define(SHUTDOWN_PAUSE, 10000). % How long to wait for snapshots to be released on shutdown % before forcing closure of snapshots @@ -270,7 +271,10 @@ monitor = {no_monitor, 0} :: leveled_monitor:monitor(), - sst_options = #sst_options{} :: sst_options()}). + sst_options = #sst_options{} :: sst_options(), + + shutdown_loops = ?SHUTDOWN_LOOPS :: non_neg_integer() + }). -type penciller_options() :: #penciller_options{}. @@ -1171,16 +1175,25 @@ handle_cast({remove_logs, ForcedLogs}, State) -> handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) -> case length(leveled_pmanifest:snapshot_pids(State#state.manifest)) of 0 -> - ok; + gen_server:cast(self(), {complete_shutdown, ShutdownType, From}), + {noreply, State}; N -> % Whilst this process sleeps, then any remaining snapshots may % release and have their release messages queued before the % complete_shutdown cast is sent - leveled_log:log(p0042, [N]), - timer:sleep(?SHUTDOWN_PAUSE) - end, - gen_server:cast(self(), {complete_shutdown, ShutdownType, From}), - {noreply, State}; + case State#state.shutdown_loops of + LoopCount when LoopCount > 0 -> + leveled_log:log(p0042, [N]), + timer:sleep(?SHUTDOWN_PAUSE div ?SHUTDOWN_LOOPS), + gen_server:cast( + self(), {maybe_defer_shutdown, ShutdownType, From}), + {noreply, State#state{shutdown_loops = LoopCount - 1}}; + 0 -> + gen_server:cast( + self(), {complete_shutdown, ShutdownType, From}), + {noreply, State} + end + end; handle_cast({complete_shutdown, ShutdownType, From}, State) -> lists:foreach( fun(Snap) -> ok = pcl_snapclose(Snap) end, diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index c4bf475c..bd41b2d2 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -47,8 +47,11 @@ expiring_indexes(_Config) -> SW1 = os:timestamp(), IBKL1 = testutil:stdload_expiring(Bookie1, KeyCount, Future), + timer:sleep(1000), + % Wait a second after last key so that none loaded in the last second LoadTime = timer:now_diff(os:timestamp(), SW1)/1000000, io:format("Load of ~w std objects in ~w seconds~n", [KeyCount, LoadTime]), + SW2 = os:timestamp(), FilterFun = fun({I, _B, _K}) -> lists:member(I, [5, 6, 7, 8]) end, LoadedEntriesInRange = lists:sort(lists:filter(FilterFun, IBKL1)), @@ -101,6 +104,7 @@ expiring_indexes(_Config) -> % this time index value of 6 testutil:stdload_object( Bookie1, B0, K0, 5, <<"value">>, leveled_util:integer_now() + 10), + timer:sleep(1000), {async, Folder2} = IndexFold(), leveled_bookie:book_indexfold(Bookie1, B0, @@ -121,23 +125,25 @@ expiring_indexes(_Config) -> FoldTime = timer:now_diff(os:timestamp(), SW1)/1000000 - LoadTime, io:format("Query returned ~w entries in ~w seconds - 3 queries + 10s wait~n", - [length(QR1), FoldTime]), - true = (LoadTime + FoldTime) < Future, - SleepTime = round((Future - (LoadTime + FoldTime)) * 1000), - io:format("Sleeping ~w s for all to expire~n", [SleepTime/1000]), - timer:sleep(SleepTime + 1000), % add a second + [length(QR3), FoldTime]), + + SleepTime = + (Future - (timer:now_diff(os:timestamp(), SW2) div (1000 * 1000))) + 1, + + io:format("Sleeping ~w s for all to expire~n", [SleepTime]), + timer:sleep(SleepTime * 1000), % Index entries should now have expired {async, Folder4} = IndexFold(), QR4 = Folder4(), - io:format("Unexpired indexes of length ~w~n", [length(QR4)]), - lists:foreach( - fun(I) -> - io:format("Unexpired index ~p~n", [I]) - end, - QR4 - ), + io:format("QR4 Unexpired indexes of length ~w~n", [length(QR4)]), + timer:sleep(1000), + {async, Folder5} = IndexFold(), + QR5 = Folder5(), + io:format("QR5 Unexpired indexes of length ~w~n", [length(QR5)]), + true = QR4 == [], + true = QR5 == [], ok = leveled_bookie:book_close(Bookie1), testutil:reset_filestructure().