Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mas d34 leveled.i459 partialmerge #460

Merged
merged 2 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion include/leveled.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
%%% Non-configurable startup defaults
%%%============================================================================
-define(MAX_SSTSLOTS, 256).
-define(MAX_MERGEBELOW, 24).
-define(LOADING_PAUSE, 1000).
-define(LOADING_BATCH, 1000).
-define(CACHE_SIZE_JITTER, 25).
Expand Down Expand Up @@ -109,7 +110,8 @@
press_level = ?COMPRESSION_LEVEL :: non_neg_integer(),
log_options = leveled_log:get_opts()
:: leveled_log:log_options(),
max_sstslots = ?MAX_SSTSLOTS :: pos_integer(),
max_sstslots = ?MAX_SSTSLOTS :: pos_integer()|infinity,
max_mergebelow = ?MAX_MERGEBELOW :: pos_integer()|infinity,
martinsumner marked this conversation as resolved.
Show resolved Hide resolved
pagecache_level = ?SST_PAGECACHELEVEL_NOLOOKUP
:: pos_integer(),
monitor = {no_monitor, 0}
Expand Down
8 changes: 2 additions & 6 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@
{eunit_opts, [verbose]}.

{project_plugins, [
{eqwalizer_rebar3,
{git_subdir,
"https://github.com/whatsapp/eqwalizer.git",
{branch, "main"},
"eqwalizer_rebar3"}}
{eqwalizer_rebar3, {git_subdir, "https://github.com/OpenRiak/eqwalizer.git", {branch, "openriak-3.4"}, "eqwalizer_rebar3"}}
]}.

{profiles,
Expand All @@ -37,7 +33,7 @@
{deps, [
{lz4, ".*", {git, "https://github.com/nhs-riak/erlang-lz4", {branch, "nhse-develop-3.4"}}},
{zstd, ".*", {git, "https://github.com/nhs-riak/zstd-erlang", {branch, "nhse-develop"}}},
{eqwalizer_support, {git_subdir, "https://github.com/whatsapp/eqwalizer.git", {branch, "main"}, "eqwalizer_support"}}
{eqwalizer_support, {git_subdir, "https://github.com/OpenRiak/eqwalizer.git", {branch, "openriak-3.4"}, "eqwalizer_support"}}
]}.

{ct_opts, [{dir, ["test/end_to_end"]}]}.
11 changes: 10 additions & 1 deletion src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
{max_journalsize, 1000000000},
{max_journalobjectcount, 200000},
{max_sstslots, 256},
{max_mergebelow, 24},
{sync_strategy, ?DEFAULT_SYNC_STRATEGY},
{head_only, false},
{waste_retention_period, undefined},
Expand Down Expand Up @@ -201,6 +202,12 @@
% The maximum number of slots in a SST file. All testing is done
% at a size of 256 (except for Quickcheck tests}, altering this
% value is not recommended
{max_mergeblow, pos_integer()|infinity} |
% The maximum number of files for a single file to be merged into
% within the ledger. If less than this, the merge will continue
% without a maximum. If this or more overlapping below, only up
% to max_mergebelow div 2 additions should be created (the merge
% should be partial)
{sync_strategy, sync_mode()} |
% Should be sync if it is necessary to flush to disk after every
% write, or none if not (allow the OS to schecdule). This has a
Expand Down Expand Up @@ -293,7 +300,7 @@
% To which level of the ledger should the ledger contents be
% pre-loaded into the pagecache (using fadvise on creation and
% startup)
{compression_method, native|lz4|zstd|none} |
{compression_method, native|lz4|zstd|none} |
% Compression method and point allow Leveled to be switched from
% using bif based compression (zlib) to using nif based compression
% (lz4 or zstd).
Expand Down Expand Up @@ -1871,6 +1878,7 @@ set_options(Opts, Monitor) ->
CompressionLevel = proplists:get_value(compression_level, Opts),

MaxSSTSlots = proplists:get_value(max_sstslots, Opts),
MaxMergeBelow = proplists:get_value(max_mergebelow, Opts),

ScoreOneIn = proplists:get_value(journalcompaction_scoreonein, Opts),

Expand Down Expand Up @@ -1904,6 +1912,7 @@ set_options(Opts, Monitor) ->
press_level = CompressionLevel,
log_options = leveled_log:get_opts(),
max_sstslots = MaxSSTSlots,
max_mergebelow = MaxMergeBelow,
monitor = Monitor},
monitor = Monitor}
}.
Expand Down
4 changes: 3 additions & 1 deletion src/leveled_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@
pc010 =>
{info, <<"Merge to be commenced for FileToMerge=~s with MSN=~w">>},
pc011 =>
{info, <<"Merge completed with MSN=~w to Level=~w and FileCounter=~w">>},
{info, <<"Merge completed with MSN=~w to Level=~w and FileCounter=~w merge_type=~w">>},
pc012 =>
{debug, <<"File to be created as part of MSN=~w Filename=~s IsBasement=~w">>},
pc013 =>
Expand All @@ -190,6 +190,8 @@
{info, <<"Grooming compaction picked file with tomb_count=~w">>},
pc025 =>
{info, <<"At level=~w file_count=~w average words for heap_block_size=~w heap_size=~w recent_size=~w bin_vheap_size=~w">>},
pc026 =>
{info, <<"Performing potential partial to level=~w merge as FileCounter=~w restricting to MaxAdditions=~w">>},
pm002 =>
{info, <<"Completed dump of L0 cache to list of l0cache_size=~w">>},
sst03 =>
Expand Down
211 changes: 163 additions & 48 deletions src/leveled_pclerk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ notify_deletions([Head|Tail], Penciller) ->
%% to be merged into multiple SSTs at a lower level.
%%
%% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1

perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN, OptsSST) ->
leveled_log:log(pc010, [leveled_pmanifest:entry_filename(Src), NewSQN]),
SrcList = [{next, Src, all}],
Expand All @@ -279,72 +278,188 @@ perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN, OptsSST) ->
),
SinkLevel = SrcLevel + 1,
SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel),
Additions =
MaxMergeBelow = OptsSST#sst_options.max_mergebelow,
MergeLimit = merge_limit(SrcLevel, length(SinkList), MaxMergeBelow),
{L2Additions, L1Additions, L2FileRemainder} =
do_merge(
SrcList, SinkList,
SinkLevel, SinkBasement,
RootPath, NewSQN, MaxSQN,
OptsSST,
[]
[],
MergeLimit
),
RevertPointerFun = fun({next, ME, _SK}) -> ME end,
SinkManifestRemovals =
lists:subtract(
lists:map(RevertPointerFun, SinkList),
lists:map(RevertPointerFun, L2FileRemainder)
),
RevertPointerFun =
fun({next, ME, _SK}) ->
ME
end,
SinkManifestList = lists:map(RevertPointerFun, SinkList),
Man0 =
leveled_pmanifest:replace_manifest_entry(
Manifest,
NewSQN,
SinkLevel,
SinkManifestList,
Additions
SinkManifestRemovals,
L2Additions
),
Man2 =
leveled_pmanifest:remove_manifest_entry(
Man0,
NewSQN,
SrcLevel,
Src
Man1 =
case L1Additions of
[] ->
leveled_pmanifest:remove_manifest_entry(
Man0,
NewSQN,
SrcLevel,
Src
);
PartialFiles ->
martinsumner marked this conversation as resolved.
Show resolved Hide resolved
leveled_pmanifest:replace_manifest_entry(
Man0,
NewSQN,
SrcLevel,
[Src],
PartialFiles
)
end,
{Man1, [Src|SinkManifestRemovals]}.

-spec merge_limit(
non_neg_integer(), non_neg_integer(), pos_integer()|infinity)
-> pos_integer()|infinity.
merge_limit(SrcLevel, SinkListLength, MMB) when SrcLevel =< 1; SinkListLength < MMB ->
infinity;
merge_limit(SrcLevel, SinkListLength, MMB) when is_integer(MMB) ->
AdditionsLimit = max(1, MMB div 2),
martinsumner marked this conversation as resolved.
Show resolved Hide resolved
leveled_log:log(pc026, [SrcLevel + 1, SinkListLength, AdditionsLimit]),
AdditionsLimit.

-type merge_maybe_expanded_pointer() ::
leveled_codec:ledger_kv()|
leveled_sst:slot_pointer()|
leveled_sst:sst_pointer().
% Different to leveled_sst:maybe_expanded_pointer/0
% No sst_closed_pointer()

-spec do_merge(
list(merge_maybe_expanded_pointer()),
list(merge_maybe_expanded_pointer()),
leveled_pmanifest:lsm_level(),
boolean(),
string(),
pos_integer(),
pos_integer(),
leveled_sst:sst_options(),
list(leveled_pmanifest:manifest_entry()),
pos_integer()|infinity) ->
{
list(leveled_pmanifest:manifest_entry()),
list(leveled_pmanifest:manifest_entry()),
list(leveled_sst:sst_pointer())
}.
do_merge(
[], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions, _Max) ->
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), full]),
{lists:reverse(Additions), [], []};
do_merge(
KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max)
when length(Additions) >= Max ->
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), partial]),
FNSrc =
leveled_penciller:sst_filename(
NewSQN, SinkLevel - 1, 1
),
{Man2, [Src|SinkManifestList]}.

do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions) ->
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions)]),
lists:reverse(Additions);
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) ->
FNSnk =
leveled_penciller:sst_filename(
NewSQN, SinkLevel, length(Additions) + 1
),
{ExpandedKL1, []} = split_unexpanded_files(KL1),
{ExpandedKL2, L2FilePointersRem} = split_unexpanded_files(KL2),
TS1 = os:timestamp(),
InfOpts = OptsSST#sst_options{max_sstslots = infinity},
% Need to be careful to make sure all the remainder goes in one file,
% could be situations whereby the max_sstslots has been changed between
% restarts - and so there is too much data for one file in the
% remainder ... but don't want to loop round and consider more complex
% scenarios here.
NewMergeKL1 =
leveled_sst:sst_newmerge(
RP, FNSrc,ExpandedKL1, [], false, SinkLevel - 1, MaxSQN, InfOpts
),
TS2 = os:timestamp(),
NewMergeKL2 =
leveled_sst:sst_newmerge(
RP, FNSnk, [], ExpandedKL2, SinkB, SinkLevel, MaxSQN, InfOpts
),
{KL1Additions, [], []} = add_entry(NewMergeKL1, FNSrc, TS1, []),
{KL2Additions, [], []} = add_entry(NewMergeKL2, FNSnk, TS2, Additions),
{lists:reverse(KL2Additions), KL1Additions, L2FilePointersRem};
do_merge(
KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max) ->
FileName =
leveled_penciller:sst_filename(
NewSQN, SinkLevel, length(Additions)
),
leveled_log:log(pc012, [NewSQN, FileName, SinkB]),
TS1 = os:timestamp(),
case leveled_sst:sst_newmerge(RP, FileName,
KL1, KL2, SinkB, SinkLevel, MaxSQN,
OptsSST) of
empty ->
leveled_log:log(pc013, [FileName]),
do_merge(
[], [],
SinkLevel, SinkB,
RP, NewSQN, MaxSQN,
OptsSST,
Additions
);
{ok, Pid, Reply, Bloom} ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
Entry =
leveled_pmanifest:new_entry(
SmallestKey, HighestKey, Pid, FileName, Bloom),
leveled_log:log_timer(pc015, [], TS1),
do_merge(
KL1Rem, KL2Rem,
SinkLevel, SinkB,
RP, NewSQN, MaxSQN,
OptsSST,
[Entry|Additions]
)
end.
NewMerge =
leveled_sst:sst_newmerge(
RP, FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN, OptsSST),
{UpdAdditions, KL1Rem, KL2Rem} =
add_entry(NewMerge, FileName, TS1, Additions),
do_merge(
KL1Rem,
KL2Rem,
SinkLevel,
SinkB,
RP,
NewSQN,
MaxSQN,
OptsSST,
UpdAdditions,
Max
).

add_entry(empty, FileName, _TS1, Additions) ->
leveled_log:log(pc013, [FileName]),
{[], [], Additions};
add_entry({ok, Pid, Reply, Bloom}, FileName, TS1, Additions) ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
Entry =
leveled_pmanifest:new_entry(
SmallestKey, HighestKey, Pid, FileName, Bloom),
leveled_log:log_timer(pc015, [], TS1),
{[Entry|Additions], KL1Rem, KL2Rem}.


-spec split_unexpanded_files(
list(merge_maybe_expanded_pointer())) ->
{
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
list(leveled_sst:sst_pointer())
}.
split_unexpanded_files(Pointers) ->
split_unexpanded_files(Pointers, [], []).
martinsumner marked this conversation as resolved.
Show resolved Hide resolved

-spec split_unexpanded_files(
list(merge_maybe_expanded_pointer()),
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
list(leveled_sst:sst_pointer())) ->
{
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
list(leveled_sst:sst_pointer())
}.
split_unexpanded_files([], MaybeExpanded, FilePointers) ->
{lists:reverse(MaybeExpanded), lists:reverse(FilePointers)};
split_unexpanded_files([{next, P, SK}|Rest], MaybeExpanded, FilePointers) ->
split_unexpanded_files(Rest, MaybeExpanded, [{next, P, SK}|FilePointers]);
split_unexpanded_files([{LK, LV}|Rest], MaybeExpanded, []) ->
% Should never see this, once a FilePointer has been seen
split_unexpanded_files(Rest, [{LK, LV}|MaybeExpanded], []);
split_unexpanded_files([{pointer, P, SIV, SK, EK}|Rest], MaybeExpanded, []) ->
% Should never see this, once a FilePointer has been seen
split_unexpanded_files(
Rest, [{pointer, P, SIV, SK, EK}|MaybeExpanded], []
).

-spec grooming_scorer(
list(leveled_pmanifest:manifest_entry()))
Expand Down
23 changes: 18 additions & 5 deletions src/leveled_sst.erl
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,18 @@

-type build_timings() :: no_timing|#build_timings{}.

-export_type([expandable_pointer/0, press_method/0, segment_check_fun/0]).
-export_type(
[
expandable_pointer/0,
maybe_expanded_pointer/0,
sst_closed_pointer/0,
sst_pointer/0,
slot_pointer/0,
press_method/0,
segment_check_fun/0,
sst_options/0
]
).

%%%============================================================================
%%% API
Expand Down Expand Up @@ -312,8 +323,8 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
end.

-spec sst_newmerge(string(), string(),
list(leveled_codec:ledger_kv()|sst_pointer()),
list(leveled_codec:ledger_kv()|sst_pointer()),
list(maybe_expanded_pointer()),
list(maybe_expanded_pointer()),
boolean(), leveled_pmanifest:lsm_level(),
integer(), sst_options())
-> empty|{ok, pid(),
Expand Down Expand Up @@ -1529,7 +1540,9 @@ compress_level(_Level, _LevelToCompress, PressMethod) ->
PressMethod.

-spec maxslots_level(
leveled_pmanifest:lsm_level(), pos_integer()) -> pos_integer().
leveled_pmanifest:lsm_level(), pos_integer()|infinity) -> pos_integer()|infinity.
maxslots_level(_Level, infinity) ->
infinity;
maxslots_level(Level, MaxSlotCount) when Level < ?DOUBLESIZE_LEVEL ->
MaxSlotCount;
maxslots_level(_Level, MaxSlotCount) ->
Expand Down Expand Up @@ -3020,7 +3033,7 @@ merge_lists(
list(binary_slot()),
leveled_codec:ledger_key()|null,
non_neg_integer(),
non_neg_integer(),
pos_integer()|infinity,
press_method(),
boolean(),
non_neg_integer(),
Expand Down
Loading