Skip to content

Commit

Permalink
Mas d31 i413 (#415)
Browse files Browse the repository at this point in the history
* Allow snapshots to be reused in queries

Allow for a full bookie snapshot to be re-used for multiple queries, not just KV fetches.

* Reduce log noise

The internal dummy tag is expected so should not prompt a log on reload

* Snapshot should have same status of active db

wrt head_only and head_lookup

* Allow logging to specified on snapshots

* Shutdown snapshot bookie is primary goes down

Inker and Penciller already will shut down based on `erlang:monitor/2`

* Review feedback

Formatting and code readability fixes
  • Loading branch information
martinsumner authored Nov 8, 2023
1 parent 9e80492 commit d544db5
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 183 deletions.
108 changes: 83 additions & 25 deletions src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@
book_logsettings/1,
book_loglevel/2,
book_addlogs/2,
book_removelogs/2]).
book_removelogs/2,
book_headstatus/1
]).

%% folding API
-export([
Expand Down Expand Up @@ -155,6 +157,7 @@
head_only = false :: boolean(),
head_lookup = true :: boolean(),
ink_checking = ?MAX_KEYCHECK_FREQUENCY :: integer(),
bookie_monref :: reference() | undefined,
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).


Expand Down Expand Up @@ -1134,6 +1137,12 @@ book_addlogs(Pid, ForcedLogs) ->
book_removelogs(Pid, ForcedLogs) ->
gen_server:cast(Pid, {remove_logs, ForcedLogs}).

-spec book_headstatus(pid()) -> {boolean(), boolean()}.
%% @doc
%% Return booleans to state the bookie is in head_only mode, and supporting
%% lookups
book_headstatus(Pid) ->
gen_server:call(Pid, head_status, infinity).

%%%============================================================================
%%% gen_server callbacks
Expand Down Expand Up @@ -1235,10 +1244,17 @@ init([Opts]) ->
{Bookie, undefined} ->
{ok, Penciller, Inker} =
book_snapshot(Bookie, store, undefined, true),
BookieMonitor = erlang:monitor(process, Bookie),
NewETS = ets:new(mem, [ordered_set]),
{HeadOnly, Lookup} = leveled_bookie:book_headstatus(Bookie),
leveled_log:log(b0002, [Inker, Penciller]),
{ok,
#state{penciller = Penciller,
inker = Inker,
ledger_cache = #ledger_cache{mem = NewETS},
head_only = HeadOnly,
head_lookup = Lookup,
bookie_monref = BookieMonitor,
is_snapshot = true}}
end.

Expand Down Expand Up @@ -1490,42 +1506,62 @@ handle_call(destroy, _From, State=#state{is_snapshot=Snp}) when Snp == false ->
{stop, normal, ok, State};
handle_call(return_actors, _From, State) ->
{reply, {ok, State#state.inker, State#state.penciller}, State};
handle_call(head_status, _From, State) ->
{reply, {State#state.head_only, State#state.head_lookup}, State};
handle_call(Msg, _From, State) ->
{reply, {unsupported_message, element(1, Msg)}, State}.


handle_cast({log_level, LogLevel}, State) ->
PCL = State#state.penciller,
INK = State#state.inker,
Monitor = element(1, State#state.monitor),
ok = leveled_penciller:pcl_loglevel(PCL, LogLevel),
ok = leveled_inker:ink_loglevel(INK, LogLevel),
ok = leveled_monitor:log_level(Monitor, LogLevel),
case element(1, State#state.monitor) of
no_monitor ->
ok;
Monitor ->
leveled_monitor:log_level(Monitor, LogLevel)
end,
ok = leveled_log:set_loglevel(LogLevel),
{noreply, State};
handle_cast({add_logs, ForcedLogs}, State) ->
PCL = State#state.penciller,
INK = State#state.inker,
Monitor = element(1, State#state.monitor),
ok = leveled_penciller:pcl_addlogs(PCL, ForcedLogs),
ok = leveled_inker:ink_addlogs(INK, ForcedLogs),
ok = leveled_monitor:log_add(Monitor, ForcedLogs),
case element(1, State#state.monitor) of
no_monitor ->
ok;
Monitor ->
leveled_monitor:log_add(Monitor, ForcedLogs)
end,
ok = leveled_log:add_forcedlogs(ForcedLogs),
{noreply, State};
handle_cast({remove_logs, ForcedLogs}, State) ->
PCL = State#state.penciller,
INK = State#state.inker,
Monitor = element(1, State#state.monitor),
ok = leveled_penciller:pcl_removelogs(PCL, ForcedLogs),
ok = leveled_inker:ink_removelogs(INK, ForcedLogs),
ok = leveled_monitor:log_remove(Monitor, ForcedLogs),
case element(1, State#state.monitor) of
no_monitor ->
ok;
Monitor ->
leveled_monitor:log_remove(Monitor, ForcedLogs)
end,
ok = leveled_log:remove_forcedlogs(ForcedLogs),
{noreply, State}.


%% handle the bookie stopping and stop this snapshot
handle_info({'DOWN', BookieMonRef, process, BookiePid, Info},
State=#state{bookie_monref = BookieMonRef, is_snapshot = true}) ->
leveled_log:log(b0004, [BookiePid, Info]),
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.


terminate(Reason, _State) ->
leveled_log:log(b0003, [Reason]).

Expand Down Expand Up @@ -1827,22 +1863,43 @@ set_options(Opts, Monitor) ->
}.


-spec return_snapfun(book_state(), store|ledger,
tuple()|no_lookup|undefined,
boolean(), boolean())
-> fun(() -> {ok, pid(), pid()|null}).
-spec return_snapfun(
book_state(), store|ledger,
tuple()|no_lookup|undefined,
boolean(), boolean())
-> fun(() -> {ok, pid(), pid()|null, fun(() -> ok)}).
%% @doc
%% Generates a function from which a snapshot can be created. The primary
%% factor here is the SnapPreFold boolean. If this is true then the snapshot
%% will be taken before the Fold function is returned. If SnapPreFold is
%% false then the snapshot will be taken when the Fold function is called.
%%
%% SnapPrefold is to be used when the intention is to queue the fold, and so
%% claling of the fold may be delayed, but it is still desired that the fold
%% calling of the fold may be delayed, but it is still desired that the fold
%% represent the point in time that the query was requested.
%%
%% Also returns a function which will close any snapshots to be used in the
%% runners post-query cleanup action
%%
%% When the bookie is a snapshot, a fresh snapshot should not be taken, the
%% previous snapshot should be used instead. Also the snapshot should not be
%% closed as part of the post-query activity as the snapshot may be reused, and
%% should be manually closed.
return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) ->
case SnapPreFold of
true ->
CloseFun =
fun(LS0, JS0) ->
fun() ->
ok = leveled_penciller:pcl_close(LS0),
case JS0 of
JS0 when is_pid(JS0) ->
leveled_inker:ink_close(JS0);
_ ->
ok
end
end
end,
case {SnapPreFold, State#state.is_snapshot} of
{true, false} ->
{ok, LS, JS} =
snapshot_store(
State#state.ledger_cache,
Expand All @@ -1852,15 +1909,23 @@ return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) ->
SnapType,
Query,
LongRunning),
fun() -> {ok, LS, JS} end;
false ->
fun() -> {ok, LS, JS, CloseFun(LS, JS)} end;
{false, false} ->
Self = self(),
% Timeout will be ignored, as will Requestor
%
% This uses the external snapshot - as the snapshot will need
% to have consistent state between Bookie and Penciller when
% it is made.
fun() -> book_snapshot(Self, SnapType, Query, LongRunning) end
fun() ->
{ok, LS, JS} =
book_snapshot(Self, SnapType, Query, LongRunning),
{ok, LS, JS, CloseFun(LS, JS)}
end;
{_ , true} ->
LS = State#state.penciller,
JS = State#state.inker,
fun() -> {ok, LS, JS, fun() -> ok end} end
end.

-spec snaptype_by_presence(boolean()) -> store|ledger.
Expand Down Expand Up @@ -2191,14 +2256,7 @@ fetch_head(Key, Penciller, LedgerCache) ->
%% The L0Index needs to be bypassed when running head_only
fetch_head(Key, Penciller, LedgerCache, HeadOnly) ->
SW = os:timestamp(),
CacheResult =
case LedgerCache#ledger_cache.mem of
undefined ->
[];
Tab ->
ets:lookup(Tab, Key)
end,
case CacheResult of
case ets:lookup(LedgerCache#ledger_cache.mem, Key) of
[{Key, Head}] ->
{Head, true};
[] ->
Expand Down
8 changes: 6 additions & 2 deletions src/leveled_codec.erl
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,8 @@ inker_reload_strategy(AltList) ->
lists:ukeysort(1, DefaultList)).


-spec get_tagstrategy(ledger_key()|tag()|dummy, compaction_strategy())
-> compaction_method().
-spec get_tagstrategy(
ledger_key()|tag()|dummy, compaction_strategy()) -> compaction_method().
%% @doc
%% Work out the compaction strategy for the key
get_tagstrategy({Tag, _, _, _}, Strategy) ->
Expand All @@ -413,6 +413,10 @@ get_tagstrategy(Tag, Strategy) ->
case lists:keyfind(Tag, 1, Strategy) of
{Tag, TagStrat} ->
TagStrat;
false when Tag == dummy ->
%% dummy is not a strategy, but this is expected to see this when
%% running in head_only mode - so don't warn
retain;
false ->
leveled_log:log(ic012, [Tag, Strategy]),
retain
Expand Down
47 changes: 33 additions & 14 deletions src/leveled_inker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -754,25 +754,34 @@ handle_cast({release_snapshot, Snapshot}, State) ->
{noreply, State#state{registered_snapshots=Rs}}
end;
handle_cast({log_level, LogLevel}, State) ->
INC = State#state.clerk,
ok = leveled_iclerk:clerk_loglevel(INC, LogLevel),
case State#state.clerk of
undefined ->
ok;
INC ->
leveled_iclerk:clerk_loglevel(INC, LogLevel)
end,
ok = leveled_log:set_loglevel(LogLevel),
CDBopts = State#state.cdb_options,
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
CDBopts0 = update_cdb_logoptions(State#state.cdb_options),
{noreply, State#state{cdb_options = CDBopts0}};
handle_cast({add_logs, ForcedLogs}, State) ->
INC = State#state.clerk,
ok = leveled_iclerk:clerk_addlogs(INC, ForcedLogs),
case State#state.clerk of
undefined ->
ok;
INC ->
leveled_iclerk:clerk_addlogs(INC, ForcedLogs)
end,
ok = leveled_log:add_forcedlogs(ForcedLogs),
CDBopts = State#state.cdb_options,
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
CDBopts0 = update_cdb_logoptions(State#state.cdb_options),
{noreply, State#state{cdb_options = CDBopts0}};
handle_cast({remove_logs, ForcedLogs}, State) ->
INC = State#state.clerk,
ok = leveled_iclerk:clerk_removelogs(INC, ForcedLogs),
case State#state.clerk of
undefined ->
ok;
INC ->
leveled_iclerk:clerk_removelogs(INC, ForcedLogs)
end,
ok = leveled_log:remove_forcedlogs(ForcedLogs),
CDBopts = State#state.cdb_options,
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
CDBopts0 = update_cdb_logoptions(State#state.cdb_options),
{noreply, State#state{cdb_options = CDBopts0}};
handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) ->
case length(State#state.registered_snapshots) of
Expand Down Expand Up @@ -816,8 +825,10 @@ handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info},
handle_info(_Info, State) ->
{noreply, State}.

terminate(_Reason, _State) ->
ok.
terminate(Reason, _State=#state{is_snapshot=Snap}) when Snap == true ->
leveled_log:log(i0027, [Reason]);
terminate(Reason, _State) ->
leveled_log:log(i0028, [Reason]).

code_change(_OldVsn, State, _Extra) ->
{ok, State}.
Expand Down Expand Up @@ -1291,6 +1302,14 @@ wrap_checkfilterfun(CheckFilterFun) ->
end
end.


-spec update_cdb_logoptions(
#cdb_options{}|undefined) -> #cdb_options{}|undefined.
update_cdb_logoptions(undefined) ->
undefined;
update_cdb_logoptions(CDBopts) ->
CDBopts#cdb_options{log_options = leveled_log:get_opts()}.

%%%============================================================================
%%% Test
%%%============================================================================
Expand Down
12 changes: 9 additions & 3 deletions src/leveled_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
{info, <<"Snapshot starting with Ink ~w Pcl ~w">>},
b0003 =>
{info, <<"Bookie closing for reason ~w">>},
b0004 =>
{warn, <<"Bookie snapshot exiting as master store ~w is down for reason ~p">>},
b0005 =>
{info, <<"LedgerSQN=~w at startup">>},
b0006 =>
Expand Down Expand Up @@ -83,17 +85,17 @@
p0005 =>
{debug, <<"Delete confirmed as file ~s is removed from Manifest">>},
p0007 =>
{debug, <<"Sent release message for cloned Penciller following close for reason ~w">>},
{debug, <<"Shutdown complete for cloned Penciller for reason ~w">>},
p0008 =>
{info, <<"Penciller closing for reason ~w">>},
p0010 =>
{info, <<"level zero discarded_count=~w on close of Penciller">>},
p0011 =>
{info, <<"Shutdown complete for Penciller for reason ~w">>},
{debug, <<"Shutdown complete for Penciller for reason ~w">>},
p0012 =>
{info, <<"Store to be started based on manifest sequence number of ~w">>},
p0013 =>
{warn, <<"Seqence number of 0 indicates no valid manifest">>},
{info, <<"Seqence number of 0 indicates no valid manifest">>},
p0014 =>
{info, <<"Maximum sequence number of ~w found in nonzero levels">>},
p0015 =>
Expand Down Expand Up @@ -246,6 +248,10 @@
{warn, <<"Journal SQN of ~w is below Ledger SQN of ~w anti-entropy will be required">>},
i0026 =>
{info, <<"Deferring shutdown due to snapshot_count=~w">>},
i0027 =>
{debug, <<"Shutdown complete for cloned Inker for reason ~w">>},
i0028 =>
{debug, <<"Shutdown complete for Inker for reason ~w">>},
ic001 =>
{info, <<"Closed for reason ~w so maybe leaving garbage">>},
ic002 =>
Expand Down
Loading

0 comments on commit d544db5

Please sign in to comment.