Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/develop-3.1' into mas-o32-upst…
Browse files Browse the repository at this point in the history
…ream.d31
  • Loading branch information
martinsumner committed Dec 2, 2024
2 parents 44edb78 + 313b8ed commit 0b4f82c
Show file tree
Hide file tree
Showing 7 changed files with 402 additions and 83 deletions.
4 changes: 3 additions & 1 deletion include/leveled.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
%%% Non-configurable startup defaults
%%%============================================================================
-define(MAX_SSTSLOTS, 256).
-define(MAX_MERGEBELOW, 24).
-define(LOADING_PAUSE, 1000).
-define(LOADING_BATCH, 1000).
-define(CACHE_SIZE_JITTER, 25).
Expand Down Expand Up @@ -107,7 +108,8 @@
press_level = ?COMPRESSION_LEVEL :: non_neg_integer(),
log_options = leveled_log:get_opts()
:: leveled_log:log_options(),
max_sstslots = ?MAX_SSTSLOTS :: pos_integer(),
max_sstslots = ?MAX_SSTSLOTS :: pos_integer()|infinity,
max_mergebelow = ?MAX_MERGEBELOW :: pos_integer()|infinity,
pagecache_level = ?SST_PAGECACHELEVEL_NOLOOKUP
:: pos_integer(),
monitor = {no_monitor, 0}
Expand Down
11 changes: 10 additions & 1 deletion src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
{max_journalsize, 1000000000},
{max_journalobjectcount, 200000},
{max_sstslots, 256},
{max_mergebelow, 24},
{sync_strategy, ?DEFAULT_SYNC_STRATEGY},
{head_only, false},
{waste_retention_period, undefined},
Expand Down Expand Up @@ -201,6 +202,12 @@
% The maximum number of slots in a SST file. All testing is done
% at a size of 256 (except for Quickcheck tests}, altering this
% value is not recommended
{max_mergeblow, pos_integer()|infinity} |
% The maximum number of files for a single file to be merged into
% within the ledger. If less than this, the merge will continue
% without a maximum. If this or more overlapping below, only up
% to max_mergebelow div 2 additions should be created (the merge
% should be partial)
{sync_strategy, sync_mode()} |
% Should be sync if it is necessary to flush to disk after every
% write, or none if not (allow the OS to schecdule). This has a
Expand Down Expand Up @@ -293,7 +300,7 @@
% To which level of the ledger should the ledger contents be
% pre-loaded into the pagecache (using fadvise on creation and
% startup)
{compression_method, native|lz4|zstd|none} |
{compression_method, native|lz4|zstd|none} |
% Compression method and point allow Leveled to be switched from
% using bif based compression (zlib) to using nif based compression
% (lz4 or zstd).
Expand Down Expand Up @@ -1836,6 +1843,7 @@ set_options(Opts, Monitor) ->
CompressionLevel = proplists:get_value(compression_level, Opts),

MaxSSTSlots = proplists:get_value(max_sstslots, Opts),
MaxMergeBelow = proplists:get_value(max_mergebelow, Opts),

ScoreOneIn = proplists:get_value(journalcompaction_scoreonein, Opts),

Expand Down Expand Up @@ -1869,6 +1877,7 @@ set_options(Opts, Monitor) ->
press_level = CompressionLevel,
log_options = leveled_log:get_opts(),
max_sstslots = MaxSSTSlots,
max_mergebelow = MaxMergeBelow,
monitor = Monitor},
monitor = Monitor}
}.
Expand Down
4 changes: 3 additions & 1 deletion src/leveled_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
pc010 =>
{info, <<"Merge to be commenced for FileToMerge=~s with MSN=~w">>},
pc011 =>
{info, <<"Merge completed with MSN=~w to Level=~w and FileCounter=~w">>},
{info, <<"Merge completed with MSN=~w to Level=~w and FileCounter=~w merge_type=~w">>},
pc012 =>
{debug, <<"File to be created as part of MSN=~w Filename=~s IsBasement=~w">>},
pc013 =>
Expand All @@ -172,6 +172,8 @@
{info, <<"Grooming compaction picked file with tomb_count=~w">>},
pc025 =>
{info, <<"At level=~w file_count=~w average words for heap_block_size=~w heap_size=~w recent_size=~w bin_vheap_size=~w">>},
pc026 =>
{info, <<"Performing potential partial to level=~w merge as FileCounter=~w restricting to MaxAdditions=~w">>},
pm002 =>
{info, <<"Completed dump of L0 cache to list of l0cache_size=~w">>},
sst03 =>
Expand Down
220 changes: 168 additions & 52 deletions src/leveled_pclerk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -262,61 +262,177 @@ perform_merge(Manifest,
MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner),
SinkLevel = SrcLevel + 1,
SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel),
Additions =
do_merge(SrcList, SinkList,
SinkLevel, SinkBasement,
RootPath, NewSQN, MaxSQN,
OptsSST,
[]),
RevertPointerFun =
fun({next, ME, _SK}) ->
ME
MaxMergeBelow = OptsSST#sst_options.max_mergebelow,
MergeLimit = merge_limit(SrcLevel, length(SinkList), MaxMergeBelow),
{L2Additions, L1Additions, L2FileRemainder} =
do_merge(
SrcList, SinkList,
SinkLevel, SinkBasement,
RootPath, NewSQN, MaxSQN,
OptsSST,
[],
MergeLimit
),
RevertPointerFun = fun({next, ME, _SK}) -> ME end,
SinkManifestRemovals =
lists:subtract(
lists:map(RevertPointerFun, SinkList),
lists:map(RevertPointerFun, L2FileRemainder)
),
Man0 =
leveled_pmanifest:replace_manifest_entry(
Manifest,
NewSQN,
SinkLevel,
SinkManifestRemovals,
L2Additions
),
Man1 =
case L1Additions of
[] ->
leveled_pmanifest:remove_manifest_entry(
Man0,
NewSQN,
SrcLevel,
Src
);
PartialFiles ->
leveled_pmanifest:replace_manifest_entry(
Man0,
NewSQN,
SrcLevel,
[Src],
PartialFiles
)
end,
SinkManifestList = lists:map(RevertPointerFun, SinkList),
Man0 = leveled_pmanifest:replace_manifest_entry(Manifest,
NewSQN,
SinkLevel,
SinkManifestList,
Additions),
Man2 = leveled_pmanifest:remove_manifest_entry(Man0,
NewSQN,
SrcLevel,
Src),
{Man2, [Src|SinkManifestList]}.

do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions) ->
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions)]),
Additions;
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) ->
FileName = leveled_penciller:sst_filename(NewSQN,
SinkLevel,
length(Additions)),
{Man1, [Src|SinkManifestRemovals]}.

-spec merge_limit(
non_neg_integer(), non_neg_integer(), pos_integer()|infinity)
-> pos_integer()|infinity.
merge_limit(SrcLevel, SinkListLength, MMB) when SrcLevel =< 1; SinkListLength < MMB ->
infinity;
merge_limit(SrcLevel, SinkListLength, MMB) when is_integer(MMB) ->
AdditionsLimit = max(1, MMB div 2),
leveled_log:log(pc026, [SrcLevel + 1, SinkListLength, AdditionsLimit]),
AdditionsLimit.

-type merge_maybe_expanded_pointer() ::
leveled_codec:ledger_kv()|
leveled_sst:slot_pointer()|
leveled_sst:sst_pointer().
% Different to leveled_sst:maybe_expanded_pointer/0
% No sst_closed_pointer()

do_merge(
[], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions, _Max) ->
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), full]),
{lists:reverse(Additions), [], []};
do_merge(
KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max)
when length(Additions) >= Max ->
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), partial]),
FNSrc =
leveled_penciller:sst_filename(
NewSQN, SinkLevel - 1, 1
),
FNSnk =
leveled_penciller:sst_filename(
NewSQN, SinkLevel, length(Additions) + 1
),
{ExpandedKL1, []} = split_unexpanded_files(KL1),
{ExpandedKL2, L2FilePointersRem} = split_unexpanded_files(KL2),
TS1 = os:timestamp(),
InfOpts = OptsSST#sst_options{max_sstslots = infinity},
% Need to be careful to make sure all the remainder goes in one file,
% could be situations whereby the max_sstslots has been changed between
% restarts - and so there is too much data for one file in the
% remainder ... but don't want to loop round and consider more complex
% scenarios here.
NewMergeKL1 =
leveled_sst:sst_newmerge(
RP, FNSrc,ExpandedKL1, [], false, SinkLevel - 1, MaxSQN, InfOpts
),
TS2 = os:timestamp(),
NewMergeKL2 =
leveled_sst:sst_newmerge(
RP, FNSnk, [], ExpandedKL2, SinkB, SinkLevel, MaxSQN, InfOpts
),
{KL1Additions, [], []} = add_entry(NewMergeKL1, FNSrc, TS1, []),
{KL2Additions, [], []} = add_entry(NewMergeKL2, FNSnk, TS2, Additions),
{lists:reverse(KL2Additions), KL1Additions, L2FilePointersRem};
do_merge(
KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max) ->
FileName =
leveled_penciller:sst_filename(
NewSQN, SinkLevel, length(Additions)
),
leveled_log:log(pc012, [NewSQN, FileName, SinkB]),
TS1 = os:timestamp(),
case leveled_sst:sst_newmerge(RP, FileName,
KL1, KL2, SinkB, SinkLevel, MaxSQN,
OptsSST) of
empty ->
leveled_log:log(pc013, [FileName]),
do_merge([], [],
SinkLevel, SinkB,
RP, NewSQN, MaxSQN,
OptsSST,
Additions);
{ok, Pid, Reply, Bloom} ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
Entry = #manifest_entry{start_key=SmallestKey,
end_key=HighestKey,
owner=Pid,
filename=FileName,
bloom=Bloom},
leveled_log:log_timer(pc015, [], TS1),
do_merge(KL1Rem, KL2Rem,
SinkLevel, SinkB,
RP, NewSQN, MaxSQN,
OptsSST,
Additions ++ [Entry])
end.
NewMerge =
leveled_sst:sst_newmerge(
RP, FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN, OptsSST),
{UpdAdditions, KL1Rem, KL2Rem} =
add_entry(NewMerge, FileName, TS1, Additions),
do_merge(
KL1Rem,
KL2Rem,
SinkLevel,
SinkB,
RP,
NewSQN,
MaxSQN,
OptsSST,
UpdAdditions,
Max
).

add_entry(empty, FileName, _TS1, Additions) ->
leveled_log:log(pc013, [FileName]),
{[], [], Additions};
add_entry({ok, Pid, Reply, Bloom}, FileName, TS1, Additions) ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
Entry =
#manifest_entry{
start_key=SmallestKey,
end_key=HighestKey,
owner=Pid,
filename=FileName,
bloom=Bloom
},
leveled_log:log_timer(pc015, [], TS1),
{[Entry|Additions], KL1Rem, KL2Rem}.


-spec split_unexpanded_files(
list(merge_maybe_expanded_pointer())) ->
{
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
list(leveled_sst:sst_pointer())
}.
split_unexpanded_files(Pointers) ->
split_unexpanded_files(Pointers, [], []).

-spec split_unexpanded_files(
list(merge_maybe_expanded_pointer()),
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
list(leveled_sst:sst_pointer())) ->
{
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
list(leveled_sst:sst_pointer())
}.
split_unexpanded_files([], MaybeExpanded, FilePointers) ->
{lists:reverse(MaybeExpanded), lists:reverse(FilePointers)};
split_unexpanded_files([{next, P, SK}|Rest], MaybeExpanded, FilePointers) ->
split_unexpanded_files(Rest, MaybeExpanded, [{next, P, SK}|FilePointers]);
split_unexpanded_files([{LK, LV}|Rest], MaybeExpanded, []) ->
% Should never see this, once a FilePointer has been seen
split_unexpanded_files(Rest, [{LK, LV}|MaybeExpanded], []);
split_unexpanded_files([{pointer, P, SIV, SK, EK}|Rest], MaybeExpanded, []) ->
% Should never see this, once a FilePointer has been seen
split_unexpanded_files(
Rest, [{pointer, P, SIV, SK, EK}|MaybeExpanded], []
).

-spec grooming_scorer(
list(leveled_pmanifest:manifest_entry()))
Expand Down
52 changes: 38 additions & 14 deletions src/leveled_sst.erl
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,17 @@

-type build_timings() :: no_timing|#build_timings{}.

-export_type([expandable_pointer/0, press_method/0, segment_check_fun/0]).
-export_type(
[
expandable_pointer/0,
sst_closed_pointer/0,
sst_pointer/0,
slot_pointer/0,
press_method/0,
segment_check_fun/0,
sst_options/0
]
).

%%%============================================================================
%%% API
Expand Down Expand Up @@ -292,17 +302,29 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
{ok, Pid, {SK, EK}, Bloom}
end.

-spec sst_newmerge(string(), string(),
list(leveled_codec:ledger_kv()|sst_pointer()),
list(leveled_codec:ledger_kv()|sst_pointer()),
boolean(), leveled_pmanifest:lsm_level(),
integer(), sst_options())
-> empty|{ok, pid(),
{{list(leveled_codec:ledger_kv()),
list(leveled_codec:ledger_kv())},
leveled_codec:ledger_key(),
leveled_codec:ledger_key()},
binary()}.
-spec sst_newmerge(
string(), string(),
list(leveled_codec:ledger_kv()|sst_pointer()),
list(leveled_codec:ledger_kv()|sst_pointer()),
boolean(),
leveled_pmanifest:lsm_level(),
integer(),
sst_options())
->
empty|
{
ok,
pid(),
{
{
list(leveled_codec:ledger_kv()),
list(leveled_codec:ledger_kv())
},
leveled_codec:ledger_key(),
leveled_codec:ledger_key()
},
binary()
}.
%% @doc
%% Start a new SST file at the assigned level passing in a two lists of
%% {Key, Value} pairs to be merged. The merge_lists function will use the
Expand Down Expand Up @@ -1433,7 +1455,9 @@ compress_level(_Level, _LevelToCompress, PressMethod) ->
PressMethod.

-spec maxslots_level(
leveled_pmanifest:lsm_level(), pos_integer()) -> pos_integer().
leveled_pmanifest:lsm_level(), pos_integer()|infinity) -> pos_integer()|infinity.
maxslots_level(_Level, infinity) ->
infinity;
maxslots_level(Level, MaxSlotCount) when Level < ?DOUBLESIZE_LEVEL ->
MaxSlotCount;
maxslots_level(_Level, MaxSlotCount) ->
Expand Down Expand Up @@ -2787,7 +2811,7 @@ merge_lists(
list(binary_slot()),
leveled_codec:ledger_key()|null,
non_neg_integer(),
non_neg_integer(),
pos_integer()|infinity,
press_method(),
boolean(),
non_neg_integer()|not_counted,
Expand Down
Loading

0 comments on commit 0b4f82c

Please sign in to comment.