Skip to content

Commit

Permalink
Mas i410 looptoclose (#420)
Browse files Browse the repository at this point in the history
* Stop waiting full SHUTDOWN_PAUSE

If there is a snapshot outstanding at shutdown time, there was a wait of SHUTDOWN_PAUSE to give the snapshot time to close down.

This causes an issue in kv_index_tictactree when rebuilds complete, when an exchange was in flight at the point the rebuild completed - the aae_controller will become blocked for the full shutdown pause, whilst it waits for the replaced key store to be closed.

This change is to loop within the shutdown pause, so that if the snapshot supporting the exchange is closed, the paused bookie can close more quickly (unblocking the controller).

Without this fix, there are intermittent issues in kv_index_tictactree's mockvnode_SUITE tests.

* Address test reliability

Be a bit clearer with waiting round seconds,  Was intermittently failing on QR4 previously (but QR5 1s later was always OK).

* Update iterator_SUITE.erl
  • Loading branch information
martinsumner authored Nov 9, 2023
1 parent 3ab3679 commit f735c19
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 26 deletions.
31 changes: 31 additions & 0 deletions src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2612,6 +2612,37 @@ generate_multiple_objects(Count, KeyNumber, ObjL) ->
ObjL ++ [{Key, Value, IndexSpec}]).


shutdown_test_() ->
{timeout, 10, fun shutdown_tester/0}.

shutdown_tester() ->
RootPath = reset_filestructure(),
{ok, Bookie1} = book_start([{root_path, RootPath}]),
lists:foreach(
fun({K, V, S}) ->
ok = book_put(Bookie1, <<"Bucket">>, K, V, S, ?STD_TAG)
end,
generate_multiple_objects(5000, 1)),
{ok, SnpPCL1, SnpJrnl1} =
leveled_bookie:book_snapshot(Bookie1, store, undefined, true),

TestPid = self(),
spawn(
fun() ->
ok = leveled_bookie:book_close(Bookie1),
TestPid ! ok
end),

timer:sleep(2000),
ok = leveled_penciller:pcl_close(SnpPCL1),
ok = leveled_inker:ink_close(SnpJrnl1),
SW = os:timestamp(),
receive ok -> ok end,
WaitForShutDown = timer:now_diff(SW, os:timestamp()) div 1000,
?assert(WaitForShutDown =< (1000 + 1)),
_ = reset_filestructure().


ttl_test() ->
RootPath = reset_filestructure(),
{ok, Bookie1} = book_start([{root_path, RootPath}]),
Expand Down
25 changes: 18 additions & 7 deletions src/leveled_inker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
-define(JOURNAL_FILEX, "cdb").
-define(PENDING_FILEX, "pnd").
-define(TEST_KC, {[], infinity}).
-define(SHUTDOWN_LOOPS, 10).
-define(SHUTDOWN_PAUSE, 10000).
% How long to wait for snapshots to be released on shutdown
% before forcing closure of snapshots
Expand All @@ -155,7 +156,8 @@
compression_method = native :: lz4|native|none,
compress_on_receipt = false :: boolean(),
snap_timeout :: pos_integer() | undefined, % in seconds
source_inker :: pid() | undefined}).
source_inker :: pid() | undefined,
shutdown_loops = ?SHUTDOWN_LOOPS :: non_neg_integer()}).


-type inker_options() :: #inker_options{}.
Expand Down Expand Up @@ -798,16 +800,25 @@ handle_cast({remove_logs, ForcedLogs}, State) ->
handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) ->
case length(State#state.registered_snapshots) of
0 ->
ok;
gen_server:cast(self(), {complete_shutdown, ShutdownType, From}),
{noreply, State};
N ->
% Whilst this process sleeps, then any remaining snapshots may
% release and have their release messages queued before the
% complete_shutdown cast is sent
leveled_log:log(i0026, [N]),
timer:sleep(?SHUTDOWN_PAUSE)
end,
gen_server:cast(self(), {complete_shutdown, ShutdownType, From}),
{noreply, State};
case State#state.shutdown_loops of
LoopCount when LoopCount > 0 ->
leveled_log:log(i0026, [N]),
timer:sleep(?SHUTDOWN_PAUSE div ?SHUTDOWN_LOOPS),
gen_server:cast(
self(), {maybe_defer_shutdown, ShutdownType, From}),
{noreply, State#state{shutdown_loops = LoopCount - 1}};
0 ->
gen_server:cast(
self(), {complete_shutdown, ShutdownType, From}),
{noreply, State}
end
end;
handle_cast({complete_shutdown, ShutdownType, From}, State) ->
lists:foreach(
fun(SnapPid) -> ok = ink_snapclose(SnapPid) end,
Expand Down
27 changes: 20 additions & 7 deletions src/leveled_penciller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@
-define(ITERATOR_SCANWIDTH, 4).
-define(TIMING_SAMPLECOUNTDOWN, 10000).
-define(TIMING_SAMPLESIZE, 100).
-define(SHUTDOWN_LOOPS, 10).
-define(SHUTDOWN_PAUSE, 10000).
% How long to wait for snapshots to be released on shutdown
% before forcing closure of snapshots
Expand Down Expand Up @@ -270,7 +271,10 @@

monitor = {no_monitor, 0} :: leveled_monitor:monitor(),

sst_options = #sst_options{} :: sst_options()}).
sst_options = #sst_options{} :: sst_options(),

shutdown_loops = ?SHUTDOWN_LOOPS :: non_neg_integer()
}).


-type penciller_options() :: #penciller_options{}.
Expand Down Expand Up @@ -1171,16 +1175,25 @@ handle_cast({remove_logs, ForcedLogs}, State) ->
handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) ->
case length(leveled_pmanifest:snapshot_pids(State#state.manifest)) of
0 ->
ok;
gen_server:cast(self(), {complete_shutdown, ShutdownType, From}),
{noreply, State};
N ->
% Whilst this process sleeps, then any remaining snapshots may
% release and have their release messages queued before the
% complete_shutdown cast is sent
leveled_log:log(p0042, [N]),
timer:sleep(?SHUTDOWN_PAUSE)
end,
gen_server:cast(self(), {complete_shutdown, ShutdownType, From}),
{noreply, State};
case State#state.shutdown_loops of
LoopCount when LoopCount > 0 ->
leveled_log:log(p0042, [N]),
timer:sleep(?SHUTDOWN_PAUSE div ?SHUTDOWN_LOOPS),
gen_server:cast(
self(), {maybe_defer_shutdown, ShutdownType, From}),
{noreply, State#state{shutdown_loops = LoopCount - 1}};
0 ->
gen_server:cast(
self(), {complete_shutdown, ShutdownType, From}),
{noreply, State}
end
end;
handle_cast({complete_shutdown, ShutdownType, From}, State) ->
lists:foreach(
fun(Snap) -> ok = pcl_snapclose(Snap) end,
Expand Down
30 changes: 18 additions & 12 deletions test/end_to_end/iterator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ expiring_indexes(_Config) ->

SW1 = os:timestamp(),
IBKL1 = testutil:stdload_expiring(Bookie1, KeyCount, Future),
timer:sleep(1000),
% Wait a second after last key so that none loaded in the last second
LoadTime = timer:now_diff(os:timestamp(), SW1)/1000000,
io:format("Load of ~w std objects in ~w seconds~n", [KeyCount, LoadTime]),
SW2 = os:timestamp(),

FilterFun = fun({I, _B, _K}) -> lists:member(I, [5, 6, 7, 8]) end,
LoadedEntriesInRange = lists:sort(lists:filter(FilterFun, IBKL1)),
Expand Down Expand Up @@ -101,6 +104,7 @@ expiring_indexes(_Config) ->
% this time index value of 6
testutil:stdload_object(
Bookie1, B0, K0, 5, <<"value">>, leveled_util:integer_now() + 10),
timer:sleep(1000),
{async, Folder2} = IndexFold(),
leveled_bookie:book_indexfold(Bookie1,
B0,
Expand All @@ -121,23 +125,25 @@ expiring_indexes(_Config) ->

FoldTime = timer:now_diff(os:timestamp(), SW1)/1000000 - LoadTime,
io:format("Query returned ~w entries in ~w seconds - 3 queries + 10s wait~n",
[length(QR1), FoldTime]),
true = (LoadTime + FoldTime) < Future,
SleepTime = round((Future - (LoadTime + FoldTime)) * 1000),
io:format("Sleeping ~w s for all to expire~n", [SleepTime/1000]),
timer:sleep(SleepTime + 1000), % add a second
[length(QR3), FoldTime]),

SleepTime =
(Future - (timer:now_diff(os:timestamp(), SW2) div (1000 * 1000))) + 1,

io:format("Sleeping ~w s for all to expire~n", [SleepTime]),
timer:sleep(SleepTime * 1000),

% Index entries should now have expired
{async, Folder4} = IndexFold(),
QR4 = Folder4(),
io:format("Unexpired indexes of length ~w~n", [length(QR4)]),
lists:foreach(
fun(I) ->
io:format("Unexpired index ~p~n", [I])
end,
QR4
),
io:format("QR4 Unexpired indexes of length ~w~n", [length(QR4)]),
timer:sleep(1000),
{async, Folder5} = IndexFold(),
QR5 = Folder5(),
io:format("QR5 Unexpired indexes of length ~w~n", [length(QR5)]),

true = QR4 == [],
true = QR5 == [],

ok = leveled_bookie:book_close(Bookie1),
testutil:reset_filestructure().
Expand Down

0 comments on commit f735c19

Please sign in to comment.