Skip to content

Commit

Permalink
Close in stages - waiting for releases (#411)
Browse files Browse the repository at this point in the history
* Close in stages - waiting for releases

Have a consistent approach to closing the inker and the penciller - so that the close can be interrupted by releasing of snapshots.  Then any unreleased snapshots are closed before shutdown - with a 10s pause to give queries a short opportunity to finish.

This should address some issues, primarily seen (but very rarely) in test whereby post-rebuild destruction of parallel AAE keystores cause the crashing of aae_folds.

The primary benefit is to stop an attempt to release a snapshot that has in fact already finished does not cause a crash of the database on normal stop.  this was primarily an issue when shutdown is delayed by an ongoing journal compaction job.

* Boost default test budget for EQC

* Update test to use correct type

* Update following review

Avoid filtering out exited PIDs when closing snapshots by catching the exit exception when the Pid is down
  • Loading branch information
martinsumner authored Oct 3, 2023
1 parent 4142914 commit 1a66349
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 88 deletions.
120 changes: 88 additions & 32 deletions src/leveled_inker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@
-define(JOURNAL_FILEX, "cdb").
-define(PENDING_FILEX, "pnd").
-define(TEST_KC, {[], infinity}).
-define(SHUTDOWN_PAUSE, 10000).
% How long to wait for snapshots to be released on shutdown
% before forcing closure of snapshots
% 10s may not be long enough for all snapshots, but avoids crashes of
% short-lived queries racing with the shutdown

-record(state, {manifest = [] :: list(),
manifest_sqn = 0 :: integer(),
Expand Down Expand Up @@ -282,6 +287,18 @@ ink_confirmdelete(Pid, ManSQN, CDBpid) ->
ink_close(Pid) ->
gen_server:call(Pid, close, infinity).

-spec ink_snapclose(pid()) -> ok.
%% @doc
%% Specifically to be used when closing snpashots on shutdown, will handle a
%% scenario where a snapshot has already exited
ink_snapclose(Pid) ->
try
ink_close(Pid)
catch
exit:{noproc, _CallDetails} ->
ok
end.

-spec ink_doom(pid()) -> {ok, [{string(), string(), string(), string()}]}.
%% @doc
%% Test function used to close a file, and return all file paths (potentially
Expand Down Expand Up @@ -666,33 +683,23 @@ handle_call({check_sqn, LedgerSQN}, _From, State) ->
end;
handle_call(get_journalsqn, _From, State) ->
{reply, {ok, State#state.journal_sqn}, State};
handle_call(close, _From, State) ->
case State#state.is_snapshot of
true ->
ok = ink_releasesnapshot(State#state.source_inker, self());
false ->
leveled_log:log(i0005, [close]),
leveled_log:log(
i0006, [State#state.journal_sqn, State#state.manifest_sqn]),
ok = leveled_iclerk:clerk_stop(State#state.clerk),
shutdown_snapshots(State#state.registered_snapshots),
shutdown_manifest(State#state.manifest)
end,
handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true ->
ok = ink_releasesnapshot(State#state.source_inker, self()),
{stop, normal, ok, State};
handle_call(doom, _From, State) ->
FPs = [filepath(State#state.root_path, journal_dir),
filepath(State#state.root_path, manifest_dir),
filepath(State#state.root_path, journal_compact_dir),
filepath(State#state.root_path, journal_waste_dir)],
leveled_log:log(i0018, []),

leveled_log:log(i0005, [doom]),
handle_call(ShutdownType, From, State)
when ShutdownType == close; ShutdownType == doom ->
case ShutdownType of
doom ->
leveled_log:log(i0018, []);
_ ->
ok
end,
leveled_log:log(i0005, [ShutdownType]),
leveled_log:log(
i0006, [State#state.journal_sqn, State#state.manifest_sqn]),
ok = leveled_iclerk:clerk_stop(State#state.clerk),
shutdown_snapshots(State#state.registered_snapshots),
shutdown_manifest(State#state.manifest),
{stop, normal, {ok, FPs}, State}.
gen_server:cast(self(), {maybe_defer_shutdown, ShutdownType, From}),
{noreply, State}.


handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) ->
Expand Down Expand Up @@ -778,8 +785,39 @@ handle_cast({remove_logs, ForcedLogs}, State) ->
ok = leveled_log:remove_forcedlogs(ForcedLogs),
CDBopts = State#state.cdb_options,
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
{noreply, State#state{cdb_options = CDBopts0}}.

{noreply, State#state{cdb_options = CDBopts0}};
handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) ->
case length(State#state.registered_snapshots) of
0 ->
ok;
N ->
% Whilst this process sleeps, then any remaining snapshots may
% release and have their release messages queued before the
% complete_shutdown cast is sent
leveled_log:log(i0026, [N]),
timer:sleep(?SHUTDOWN_PAUSE)
end,
gen_server:cast(self(), {complete_shutdown, ShutdownType, From}),
{noreply, State};
handle_cast({complete_shutdown, ShutdownType, From}, State) ->
lists:foreach(
fun(SnapPid) -> ok = ink_snapclose(SnapPid) end,
lists:map(
fun(Snapshot) -> element(1, Snapshot) end,
State#state.registered_snapshots)),
shutdown_manifest(State#state.manifest),
case ShutdownType of
doom ->
FPs =
[filepath(State#state.root_path, journal_dir),
filepath(State#state.root_path, manifest_dir),
filepath(State#state.root_path, journal_compact_dir),
filepath(State#state.root_path, journal_waste_dir)],
gen_server:reply(From, {ok, FPs});
close ->
gen_server:reply(From, ok)
end,
{stop, normal, State}.

%% handle the bookie stopping and stop this snapshot
handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info},
Expand All @@ -801,6 +839,7 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%============================================================================


-spec start_from_file(inker_options()) -> {ok, ink_state()}.
%% @doc
%% Start an Inker from the state on disk (i.e. not a snapshot).
Expand Down Expand Up @@ -866,13 +905,6 @@ start_from_file(InkOpts) ->
clerk = Clerk}}.


-spec shutdown_snapshots(list(registered_snapshot())) -> ok.
%% @doc
%% Shutdown any snapshots before closing the store
shutdown_snapshots(Snapshots) ->
lists:foreach(fun({Snap, _TS, _SQN}) -> ok = ink_close(Snap) end,
Snapshots).

-spec shutdown_manifest(leveled_imanifest:manifest()) -> ok.
%% @doc
%% Shutdown all files in the manifest
Expand Down Expand Up @@ -1607,4 +1639,28 @@ loop() ->
ok
end.

close_no_crash_test_() ->
{timeout, 60, fun close_no_crash_tester/0}.

close_no_crash_tester() ->
RootPath = "test/test_area/journal",
build_dummy_journal(),
CDBopts = #cdb_options{max_size=300000, binary_mode=true},
{ok, Inker} =
ink_start(
#inker_options{
root_path=RootPath,
cdb_options=CDBopts,
compression_method=native,
compress_on_receipt=true}),

SnapOpts =
#inker_options{
start_snapshot=true, bookies_pid = self(), source_inker=Inker},
{ok, InkSnap} = ink_snapstart(SnapOpts),

exit(InkSnap, kill),
ok = ink_close(Inker),
clean_testdir(RootPath).

-endif.
4 changes: 4 additions & 0 deletions src/leveled_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@
{info, <<"Archiving filename ~s as unused at startup">>},
p0041 =>
{info, <<"Penciller manifest switched from SQN ~w to ~w">>},
p0042 =>
{info, <<"Deferring shutdown due to snapshot_count=~w">>},
pc001 =>
{info, <<"Penciller's clerk ~w started with owner ~w">>},
pc005 =>
Expand Down Expand Up @@ -244,6 +246,8 @@
{info, <<"Prompted roll at NewSQN=~w">>},
i0025 =>
{warn, <<"Journal SQN of ~w is below Ledger SQN of ~w anti-entropy will be required">>},
i0026 =>
{info, <<"Deferring shutdown due to snapshot_count=~w">>},
ic001 =>
{info, <<"Closed for reason ~w so maybe leaving garbage">>},
ic002 =>
Expand Down
102 changes: 85 additions & 17 deletions src/leveled_penciller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@
-define(TIMING_SAMPLECOUNTDOWN, 10000).
-define(TIMING_SAMPLESIZE, 100).
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
-define(SHUTDOWN_PAUSE, 10000).
% How long to wait for snapshots to be released on shutdown
% before forcing closure of snapshots
% 10s may not be long enough for all snapshots, but avoids crashes of
% short-lived queries racing with the shutdown

-record(state, {manifest ::
leveled_pmanifest:manifest() | undefined | redacted,
Expand Down Expand Up @@ -566,15 +571,27 @@ pcl_persistedsqn(Pid) ->
%% @doc
%% Close the penciller neatly, trying to persist to disk anything in the memory
pcl_close(Pid) ->
gen_server:call(Pid, close, 60000).
gen_server:call(Pid, close, infinity).

-spec pcl_snapclose(pid()) -> ok.
%% @doc
%% Specifically to be used when closing snpashots on shutdown, will handle a
%% scenario where a snapshot has already exited
pcl_snapclose(Pid) ->
try
pcl_close(Pid)
catch
exit:{noproc, _CallDetails} ->
ok
end.

-spec pcl_doom(pid()) -> {ok, list()}.
%% @doc
%% Close the penciller neatly, trying to persist to disk anything in the memory
%% Return a list of filepaths from where files exist for this penciller (should
%% the calling process which to erase the store).
pcl_doom(Pid) ->
gen_server:call(Pid, doom, 60000).
gen_server:call(Pid, doom, infinity).

-spec pcl_checkbloomtest(pid(), tuple()) -> boolean().
%% @doc
Expand Down Expand Up @@ -906,7 +923,7 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning},
handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true ->
ok = pcl_releasesnapshot(State#state.source_penciller, self()),
{stop, normal, ok, State};
handle_call(close, _From, State) ->
handle_call(close, From, State) ->
% Level 0 files lie outside of the manifest, and so if there is no L0
% file present it is safe to write the current contents of memory. If
% there is a L0 file present - then the memory can be dropped (it is
Expand Down Expand Up @@ -935,17 +952,13 @@ handle_call(close, _From, State) ->
false ->
leveled_log:log(p0010, [State#state.levelzero_size])
end,
shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),
{stop, normal, ok, State};
handle_call(doom, _From, State) ->
gen_server:cast(self(), {maybe_defer_shutdown, close, From}),
{noreply, State};
handle_call(doom, From, State) ->
leveled_log:log(p0030, []),
ok = leveled_pclerk:clerk_close(State#state.clerk),

shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),

ManifestFP = State#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
FilesFP = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/",
{stop, normal, {ok, [ManifestFP, FilesFP]}, State};
gen_server:cast(self(), {maybe_defer_shutdown, doom, From}),
{noreply, State};
handle_call({checkbloom_fortest, Key, Hash}, _From, State) ->
Manifest = State#state.manifest,
FoldFun =
Expand Down Expand Up @@ -995,8 +1008,8 @@ handle_cast({manifest_change, Manifest}, State) ->
work_ongoing=false}}
end;
handle_cast({release_snapshot, Snapshot}, State) ->
Manifest0 = leveled_pmanifest:release_snapshot(State#state.manifest,
Snapshot),
Manifest0 =
leveled_pmanifest:release_snapshot(State#state.manifest, Snapshot),
leveled_log:log(p0003, [Snapshot]),
{noreply, State#state{manifest=Manifest0}};
handle_cast({confirm_delete, PDFN, FilePid}, State=#state{is_snapshot=Snap})
Expand Down Expand Up @@ -1156,7 +1169,34 @@ handle_cast({remove_logs, ForcedLogs}, State) ->
ok = leveled_log:remove_forcedlogs(ForcedLogs),
SSTopts = State#state.sst_options,
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
{noreply, State#state{sst_options = SSTopts0}}.
{noreply, State#state{sst_options = SSTopts0}};
handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) ->
case length(leveled_pmanifest:snapshot_pids(State#state.manifest)) of
0 ->
ok;
N ->
% Whilst this process sleeps, then any remaining snapshots may
% release and have their release messages queued before the
% complete_shutdown cast is sent
leveled_log:log(p0042, [N]),
timer:sleep(?SHUTDOWN_PAUSE)
end,
gen_server:cast(self(), {complete_shutdown, ShutdownType, From}),
{noreply, State};
handle_cast({complete_shutdown, ShutdownType, From}, State) ->
lists:foreach(
fun(Snap) -> ok = pcl_snapclose(Snap) end,
leveled_pmanifest:snapshot_pids(State#state.manifest)),
shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),
case ShutdownType of
doom ->
ManifestFP = State#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
FilesFP = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/",
gen_server:reply(From, {ok, [ManifestFP, FilesFP]});
close ->
gen_server:reply(From, ok)
end,
{stop, normal, State}.


%% handle the bookie stopping and stop this snapshot
Expand Down Expand Up @@ -1195,8 +1235,8 @@ sst_rootpath(RootPath) ->
FP.

sst_filename(ManSQN, Level, Count) ->
lists:flatten(io_lib:format("./~w_~w_~w" ++ ?SST_FILEX,
[ManSQN, Level, Count])).
lists:flatten(
io_lib:format("./~w_~w_~w" ++ ?SST_FILEX, [ManSQN, Level, Count])).


%%%============================================================================
Expand Down Expand Up @@ -2009,6 +2049,34 @@ format_status_test() ->
?assertMatch(redacted, ST#state.levelzero_astree),
clean_testdir(RootPath).

close_no_crash_test_() ->
{timeout, 60, fun close_no_crash_tester/0}.

close_no_crash_tester() ->
RootPath = "test/test_area/ledger_close",
clean_testdir(RootPath),
{ok, PCL} =
pcl_start(
#penciller_options{
root_path=RootPath,
max_inmemory_tablesize=1000,
sst_options=#sst_options{}}),
{ok, PclSnap} =
pcl_snapstart(
#penciller_options{
start_snapshot = true,
snapshot_query = undefined,
bookies_mem = {empty_cache, empty_index, 1, 1},
source_penciller = PCL,
snapshot_longrunning = true,
bookies_pid = self()
}
),
exit(PclSnap, kill),
ok = pcl_close(PCL),
clean_testdir(RootPath).


simple_server_test() ->
RootPath = "test/test_area/ledger",
clean_testdir(RootPath),
Expand Down
Loading

0 comments on commit 1a66349

Please sign in to comment.