Skip to content

Commit

Permalink
Manifest Entry to belong to leveled_pmanifest
Browse files Browse the repository at this point in the history
There are two manifests - leveled_pmanifest and leveled_imanifest.  Both have manifest_entry() type objects, but these types are different.  To avoid confusion don't include the pmanifest manifest_entry() within the global include file - be specific that it belongs to the leveled_pmanifest module
  • Loading branch information
martinsumner committed Nov 12, 2024
1 parent 4df74e3 commit dbc2061
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 113 deletions.
7 changes: 0 additions & 7 deletions include/leveled.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,6 @@
is_basement = false :: boolean(),
timestamp :: integer()}).

-record(manifest_entry,
{start_key :: tuple(),
end_key :: tuple(),
owner :: pid(),
filename :: string() | undefined,
bloom = none :: leveled_ebloom:bloom() | none}).

-record(cdb_options,
{max_size :: pos_integer() | undefined,
max_count :: pos_integer() | undefined,
Expand Down
178 changes: 109 additions & 69 deletions src/leveled_pclerk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -222,16 +222,22 @@ merge(SrcLevel, Manifest, RootPath, OptsSST) ->
leveled_pmanifest:merge_lookup(
Manifest,
SrcLevel + 1,
Src#manifest_entry.start_key,
Src#manifest_entry.end_key
leveled_pmanifest:entry_startkey(Src),
leveled_pmanifest:entry_endkey(Src)
),
Candidates = length(SinkList),
leveled_log:log(pc008, [SrcLevel, Candidates]),
case Candidates of
0 ->
NewLevel = SrcLevel + 1,
leveled_log:log(pc009, [Src#manifest_entry.filename, NewLevel]),
leveled_sst:sst_switchlevels(Src#manifest_entry.owner, NewLevel),
leveled_log:log(
pc009,
[leveled_pmanifest:entry_filename(Src), NewLevel]
),
leveled_sst:sst_switchlevels(
leveled_pmanifest:entry_owner(Src),
NewLevel
),
Man0 =
leveled_pmanifest:switch_manifest_entry(
Manifest,
Expand All @@ -251,7 +257,11 @@ merge(SrcLevel, Manifest, RootPath, OptsSST) ->
notify_deletions([], _Penciller) ->
ok;
notify_deletions([Head|Tail], Penciller) ->
ok = leveled_sst:sst_setfordelete(Head#manifest_entry.owner, Penciller),
ok =
leveled_sst:sst_setfordelete(
leveled_pmanifest:entry_owner(Head),
Penciller
),
notify_deletions(Tail, Penciller).


Expand All @@ -261,9 +271,12 @@ notify_deletions([Head|Tail], Penciller) ->
%% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1

perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN, OptsSST) ->
leveled_log:log(pc010, [Src#manifest_entry.filename, NewSQN]),
leveled_log:log(pc010, [leveled_pmanifest:entry_filename(Src), NewSQN]),
SrcList = [{next, Src, all}],
MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner),
MaxSQN =
leveled_sst:sst_getmaxsequencenumber(
leveled_pmanifest:entry_owner(Src)
),
SinkLevel = SrcLevel + 1,
SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel),
Additions =
Expand Down Expand Up @@ -321,13 +334,8 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) ->
{ok, Pid, Reply, Bloom} ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
Entry =
#manifest_entry{
start_key=SmallestKey,
end_key=HighestKey,
owner=Pid,
filename=FileName,
bloom=Bloom
},
leveled_pmanifest:new_entry(
SmallestKey, HighestKey, Pid, FileName, Bloom),
leveled_log:log_timer(pc015, [], TS1),
do_merge(
KL1Rem, KL2Rem,
Expand All @@ -342,15 +350,17 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) ->
list(leveled_pmanifest:manifest_entry()))
-> leveled_pmanifest:manifest_entry().
grooming_scorer([ME | MEs]) ->
InitTombCount = leveled_sst:sst_gettombcount(ME#manifest_entry.owner),
InitTombCount =
leveled_sst:sst_gettombcount(leveled_pmanifest:entry_owner(ME)),
{HighestTC, BestME} = grooming_scorer(InitTombCount, ME, MEs),
leveled_log:log(pc024, [HighestTC]),
BestME.

grooming_scorer(HighestTC, BestME, []) ->
{HighestTC, BestME};
grooming_scorer(HighestTC, BestME, [ME | MEs]) ->
TombCount = leveled_sst:sst_gettombcount(ME#manifest_entry.owner),
TombCount =
leveled_sst:sst_gettombcount(leveled_pmanifest:entry_owner(ME)),
case TombCount > HighestTC of
true ->
grooming_scorer(TombCount, ME, MEs);
Expand Down Expand Up @@ -448,9 +458,9 @@ grooming_score_test() ->
true),
DSK = {o, <<"B">>, <<"SK">>, null},
DEK = {o, <<"E">>, <<"EK">>, null},
ME1 = #manifest_entry{owner=PidL3_1, start_key = DSK, end_key = DEK},
ME1B = #manifest_entry{owner=PidL3_1B, start_key = DSK, end_key = DEK},
ME2 = #manifest_entry{owner=PidL3_2, start_key = DSK, end_key = DEK},
ME1 = leveled_pmanifest:new_entry(DSK, DEK, PidL3_1, "dummyL3_1", none),
ME1B = leveled_pmanifest:new_entry(DSK, DEK, PidL3_1B, "dummyL3_1B", none),
ME2 = leveled_pmanifest:new_entry(DSK, DEK, PidL3_2, "dummyL3_2", none),
?assertMatch(ME1, grooming_scorer([ME1, ME2])),
?assertMatch(ME1, grooming_scorer([ME2, ME1])),
% prefer the file with the tombstone
Expand All @@ -467,64 +477,94 @@ merge_file_test() ->
ok = filelib:ensure_dir("test/test_area/ledger_files/"),
KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)),
{ok, PidL1_1, _, _} =
leveled_sst:sst_new("test/test_area/ledger_files/",
"KL1_L1.sst",
1,
KL1_L1,
999999,
#sst_options{}),
leveled_sst:sst_new(
"test/test_area/ledger_files/",
"KL1_L1.sst",
1,
KL1_L1,
999999,
#sst_options{}
),
KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)),
{ok, PidL2_1, _, _} =
leveled_sst:sst_new("test/test_area/ledger_files/",
"KL1_L2.sst",
2,
KL1_L2,
999999,
#sst_options{}),
leveled_sst:sst_new(
"test/test_area/ledger_files/",
"KL1_L2.sst",
2,
KL1_L2,
999999,
#sst_options{}
),
KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)),
{ok, PidL2_2, _, _} =
leveled_sst:sst_new("test/test_area/ledger_files/",
"KL2_L2.sst",
2,
KL2_L2,
999999,
#sst_options{press_method = lz4}),
leveled_sst:sst_new(
"test/test_area/ledger_files/",
"KL2_L2.sst",
2,
KL2_L2,
999999,
#sst_options{press_method = lz4}
),
KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)),
{ok, PidL2_3, _, _} =
leveled_sst:sst_new("test/test_area/ledger_files/",
"KL3_L2.sst",
2,
KL3_L2,
999999,
#sst_options{press_method = lz4}),
leveled_sst:sst_new(
"test/test_area/ledger_files/",
"KL3_L2.sst",
2,
KL3_L2,
999999,
#sst_options{press_method = lz4}
),
KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)),
{ok, PidL2_4, _, _} =
leveled_sst:sst_new("test/test_area/ledger_files/",
"KL4_L2.sst",
2,
KL4_L2,
999999,
#sst_options{press_method = lz4}),
E1 = #manifest_entry{owner = PidL1_1,
filename = "./KL1_L1.sst",
end_key = lists:last(KL1_L1),
start_key = lists:nth(1, KL1_L1)},
E2 = #manifest_entry{owner = PidL2_1,
filename = "./KL1_L2.sst",
end_key = lists:last(KL1_L2),
start_key = lists:nth(1, KL1_L2)},
E3 = #manifest_entry{owner = PidL2_2,
filename = "./KL2_L2.sst",
end_key = lists:last(KL2_L2),
start_key = lists:nth(1, KL2_L2)},
E4 = #manifest_entry{owner = PidL2_3,
filename = "./KL3_L2.sst",
end_key = lists:last(KL3_L2),
start_key = lists:nth(1, KL3_L2)},
E5 = #manifest_entry{owner = PidL2_4,
filename = "./KL4_L2.sst",
end_key = lists:last(KL4_L2),
start_key = lists:nth(1, KL4_L2)},
leveled_sst:sst_new(
"test/test_area/ledger_files/",
"KL4_L2.sst",
2,
KL4_L2,
999999,
#sst_options{press_method = lz4}
),
E1 =
leveled_pmanifest:new_entry(
lists:nth(1, KL1_L1),
lists:last(KL1_L1),
PidL1_1,
"./KL1_L1.sst",
none
),
E2 =
leveled_pmanifest:new_entry(
lists:nth(1, KL1_L2),
lists:last(KL1_L2),
PidL2_1,
"./KL1_L2.sst",
none
),
E3 =
leveled_pmanifest:new_entry(
lists:nth(1, KL2_L2),
lists:last(KL2_L2),
PidL2_2,
"./KL2_L2.sst",
none
),
E4 =
leveled_pmanifest:new_entry(
lists:nth(1, KL3_L2),
lists:last(KL3_L2),
PidL2_3,
"./KL3_L2.sst",
none
),
E5 =
leveled_pmanifest:new_entry(
lists:nth(1, KL4_L2),
lists:last(KL4_L2),
PidL2_4,
"./KL4_L2.sst",
none
),

Man0 = leveled_pmanifest:new_manifest(),
Man1 = leveled_pmanifest:insert_manifest_entry(Man0, 1, 2, E2),
Expand Down
30 changes: 11 additions & 19 deletions src/leveled_penciller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1137,14 +1137,7 @@ handle_cast(
State = #state{manifest = Man, levelzero_constructor = L0C, clerk = Clerk})
when ?IS_DEF(Man), ?IS_DEF(L0C), ?IS_DEF(Clerk) ->
leveled_log:log(p0029, []),
ManEntry =
#manifest_entry{
start_key=StartKey,
end_key=EndKey,
owner=L0C,
filename=FN,
bloom=Bloom
},
ManEntry = leveled_pmanifest:new_entry(StartKey, EndKey, L0C, FN, Bloom),
ManifestSQN = leveled_pmanifest:get_manifest_sqn(Man) + 1,
UpdMan =
leveled_pmanifest:insert_manifest_entry(Man, ManifestSQN, 0, ManEntry),
Expand Down Expand Up @@ -1342,9 +1335,12 @@ sst_filename(ManSQN, Level, Count) ->
%%% Internal functions
%%%============================================================================

-type update_forcedlogs_fun() :: fun((pid(), list(atom())) -> ok).
-type update_loglevel_fun() :: fun((pid(), atom()) -> ok).

-spec update_clerk
(pid()|undefined, fun((pid(), atom()) -> ok), atom()) -> ok;
(pid()|undefined, fun((pid(), list(atom())) -> ok), list(atom())) -> ok.
(pid()|undefined, update_loglevel_fun(), atom()) -> ok;
(pid()|undefined, update_forcedlogs_fun(), list(atom())) -> ok.
update_clerk(undefined, _F, _T) ->
ok;
update_clerk(Clerk, F, T) when is_pid(Clerk) ->
Expand Down Expand Up @@ -1402,12 +1398,8 @@ start_from_file(
{ok, L0Pid, {L0StartKey, L0EndKey}, Bloom} = L0Open,
L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid),
L0Entry =
#manifest_entry{
start_key = L0StartKey,
end_key = L0EndKey,
filename = L0FN,
owner = L0Pid,
bloom = Bloom},
leveled_pmanifest:new_entry(
L0StartKey, L0EndKey, L0Pid, L0FN, Bloom),
Manifest2 =
leveled_pmanifest:insert_manifest_entry(
Manifest1, ManSQN + 1, 0, L0Entry),
Expand Down Expand Up @@ -1448,13 +1440,13 @@ shutdown_manifest(Manifest, L0Constructor) ->
EntryCloseFun =
fun(ME) ->
Owner =
case is_record(ME, manifest_entry) of
case leveled_pmanifest:is_entry(ME) of
true ->
ME#manifest_entry.owner;
leveled_pmanifest:entry_owner(ME);
false ->
case ME of
{_SK, ME0} ->
ME0#manifest_entry.owner;
leveled_pmanifest:entry_owner(ME0);
ME ->
ME
end
Expand Down
Loading

0 comments on commit dbc2061

Please sign in to comment.