Skip to content

Commit

Permalink
Clarify types
Browse files Browse the repository at this point in the history
and remove logging added for testing
  • Loading branch information
martinsumner committed Oct 17, 2023
1 parent 05b5837 commit 7b47e52
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 58 deletions.
4 changes: 0 additions & 4 deletions src/hashtree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -880,16 +880,12 @@ hashes(State, Segments) ->
-spec snapshot(hashtree()) -> hashtree().
snapshot(State) ->
Mod = State#state.database_mod,
?LOG_INFO(
"Update snapshot - db ~w snapshot ~w",
[State#state.ref, State#state.itr]),
{ok, Itr} = Mod:snapshot(State#state.ref, State#state.itr),
State#state{itr=Itr}.

-spec multi_select_segment(
hashtree(), list('*'|integer()), select_fun(T)) -> [{integer(), T}].
multi_select_segment(#state{id=Id, itr=Itr, database_mod=Mod}, Segments, F) ->
?LOG_INFO("Multi select segment on snapshot ~w", [Itr]),
Mod:multi_select_segment(Id, Itr, Segments, F).


Expand Down
108 changes: 54 additions & 54 deletions src/hashtree_leveled.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,21 @@
encode_key/1
]).


-export([fake_close/1]).

-type tree_id_bin() :: <<_:176>>.
-type db_key() :: {binary(), binary()}|{{binary(), binary()}, binary()}.
-type select_fun(T) :: fun((orddict:orddict()) -> T).
-type active_store() :: pid().
-type active_snap() :: pid().
-type store_ref() :: active_store()|undefined.
-type snapshot_ref() :: active_snap()|undefined.

-define(HEAD_TAG, h).
-define(STORE_FORCED_LOGS, []).
-define(STORE_LOG_LEVEL, warn).
-define(SNAP_FORCED_LOGS, []).
-define(SNAP_LOG_LEVEL, warn).
-define(PAUSE_ON_BUSY, 40).

-include_lib("kernel/include/logger.hrl").

Expand All @@ -53,18 +60,17 @@
%%% API
%%%===================================================================

-spec new(proplists:proplist()) -> {term(), string()}.
-spec new(proplists:proplist()) -> {active_store(), string()}.
new(Options) ->
DataDir = hashtree:get_path(Options),

LeveledOpts =
[
{root_path, DataDir},
{max_journalobjectcount, 20000},
%% one tenth of standard size - as head_only
{log_level, warn},
{forced_logs, [b0001, b0002, b0003, i0023, p0042]},
{database_id, 0},
{log_level, ?STORE_LOG_LEVEL},
{forced_logs, ?STORE_FORCED_LOGS},
{database_id, 65534}, % 65535 used for tictac aae stores
{head_only, with_lookup},
{cache_size, 2000},
{sync_strategy, none},
Expand All @@ -76,12 +82,11 @@ new(Options) ->
%% question over whether in an idle cluster there may be a need for a
%% non-finite timeout
],

ok = filelib:ensure_dir(DataDir),
{ok, DB} = leveled_bookie:book_start(LeveledOpts),
{DB, DataDir}.

-spec close(term(), term()) -> ok.
-spec close(store_ref(), snapshot_ref()) -> ok.
close(undefined, undefined) ->
ok;
close(DB, undefined) ->
Expand All @@ -96,21 +101,20 @@ close(DB, Snapshot) ->
%% store is a shared (linked) store it might be it will be closed prior to a
%% snapshot in another pair on the same store. In this case the store will
%% wait for 20s to close to give a chance for the snapshot to clear.
-spec close_group(list({term(), term()})) -> ok.
-spec close_group(list({store_ref(), snapshot_ref()})) -> ok.
close_group(DBList) ->
{DBs, Snapshots} = lists:unzip(DBList),
lists:foreach(
fun(DB) -> close_db(DB) end,
lists:usort(Snapshots) ++ lists:usort(DBs)).


-spec destroy(string()) -> ok.
destroy(Path) ->
hashtree:destroy(Path).

-spec encode_key(
{segment, tree_id_bin(), integer(), binary()}|
{bucket, tree_id_bin(), integer(), integer()}|
{segment, hashtree:tree_id_bin(), integer(), binary()}|
{bucket, hashtree:tree_id_bin(), integer(), integer()}|
{meta, binary()}) -> db_key().
encode_key({segment, TreeId, Segment, Key}) ->
{{<<$t, TreeId:22/binary>>, <<Segment:64/integer>>}, <<Key/binary>>};
Expand All @@ -119,37 +123,35 @@ encode_key({bucket, TreeId, Level, Bucket}) ->
encode_key({meta, Key}) ->
{<<$m>>, <<Key/binary>>}.


-spec snapshot(term(), term()) -> {ok, term()}.
snapshot(DB, undefined) ->
{ok, Snapshot} = leveled_bookie:book_start([{snapshot_bookie, DB}]),
ok = leveled_bookie:book_loglevel(Snapshot, warn),
ok =
leveled_bookie:book_addlogs(
Snapshot, [b0001, b0002, b0003, i0027, p0007]
),
ok = leveled_bookie:book_loglevel(Snapshot, ?SNAP_LOG_LEVEL),
ok = leveled_bookie:book_addlogs(Snapshot, ?SNAP_FORCED_LOGS),
{ok, Snapshot};
snapshot(DB, Snapshot) ->
close_db(Snapshot),
snapshot(DB, undefined).

-spec get(term(), db_key()) -> {ok, binary()}| not_found | {error, any()}.
-spec get(
active_store()|active_snap(), db_key())
-> {ok, binary()}| not_found | {error, any()}.
get(DB, {Bucket, Key}) ->
leveled_bookie:book_headonly(DB, Bucket, Key, null).

-spec put(term(), db_key(), binary()) -> ok.
-spec put(store_ref(), db_key(), binary()) -> ok.
put(DB, {Bucket, Key}, Value) ->
leveled_bookie:book_mput(
DB, [{add, v1, Bucket, Key, null, undefined, Value}]).
put_object_specs(DB, [{add, v1, Bucket, Key, null, undefined, Value}]).

-spec mput(term(), list({put, db_key(), binary()}|{delete, db_key()})) -> ok.
-spec mput(
active_store(), list({put, db_key(), binary()}|{delete, db_key()}))
-> ok.
mput(DB, Updates) ->
%% Buffer has been built backwards and reversed
%% ... so most recent updates are now at the tail of the list
%% e.g. [FirstUpdate, SecondUpdate ..., NthUpdate]
%% Need to de-duplicate this, so only the most recent change is added for
%% each key - so reverse before ukeysort.
%% Order expected for leveled is
%% each key - so reverse before ukeysort. Order expected for leveled is:
%% [NthUpdate, ..., SecondUpdate, FirstUpdate] - so don't re-reverse
ObjectSpecs =
lists:map(
Expand All @@ -163,14 +165,13 @@ mput(DB, Updates) ->
end,
lists:ukeysort(2, lists:reverse(Updates))
),
leveled_bookie:book_mput(DB, ObjectSpecs).
put_object_specs(DB, ObjectSpecs).

-spec delete(term(), db_key()) -> ok.
-spec delete(active_store(), db_key()) -> ok.
delete(DB, {Bucket, Key}) ->
leveled_bookie:book_mput(
DB, [{remove, v1, Bucket, Key, null, undefined, null}]).
put_object_specs(DB, [{remove, v1, Bucket, Key, null, undefined, null}]).

-spec clear_buckets(tree_id_bin(), term()) -> ok.
-spec clear_buckets(hashtree:tree_id_bin(), active_store()) -> ok.
clear_buckets(Id, DB) ->
FoldFun =
fun(Bucket, {Key, null}, Acc) ->
Expand All @@ -184,13 +185,13 @@ clear_buckets(Id, DB) ->
{FoldFun, []}
),
BucketKeyList = BucketFolder(),
leveled_bookie:book_mput(DB, BucketKeyList),
put_object_specs(DB, BucketKeyList),
?LOG_DEBUG("Tree ~p cleared ~p segments.\n", [Id, length(BucketKeyList)]),
ok.


-spec multi_select_segment(
term(), term(), list('*'|integer()), select_fun(T)) -> [{integer(), T}].
hashtree:tree_id_bin(), active_snap(), list('*'|integer()), select_fun(T))
-> [{integer(), T}].
multi_select_segment(Id, Itr, Segments, F) ->
DBType =
element(1, element(1, encode_key({segment, Id, 0, <<>>}))),
Expand All @@ -200,12 +201,9 @@ multi_select_segment(Id, Itr, Segments, F) ->
{DBType, <<Seg:64/integer>>} ->
NewEntry = {hashtree:external_encode(Id, Seg, Key), Value},
case Acc of
[] ->
[{Seg, [NewEntry]}];
[{Seg, KVL}|T] ->
[{Seg, [NewEntry|KVL]}|T];
Acc ->
[{Seg, [NewEntry]}|Acc]
[] -> [{Seg, [NewEntry]}];
[{Seg, KVL}|T] -> [{Seg, [NewEntry|KVL]}|T];
Acc -> [{Seg, [NewEntry]}|Acc]
end;
_ ->
Acc
Expand All @@ -215,9 +213,7 @@ multi_select_segment(Id, Itr, Segments, F) ->
case Segments of
['*', '*'] ->
leveled_bookie:book_headfold(
Itr, ?HEAD_TAG, {FoldFun, []},
false, false, false
);
Itr, ?HEAD_TAG, {FoldFun, []}, false, false, false);
Segments ->
BList =
lists:map(
Expand All @@ -234,33 +230,37 @@ multi_select_segment(Id, Itr, Segments, F) ->
SegKeyValues = Folder(),
Result =
lists:map(
fun({S, KVL}) -> {S, F(lists:reverse(KVL))} end,
SegKeyValues
),
fun({S, KVL}) -> {S, F(lists:reverse(KVL))} end, SegKeyValues),
lists:reverse(Result).


%%%===================================================================
%%% Internal functions
%%%===================================================================


-spec close_db(pid()|undefined) -> ok.
-spec close_db(store_ref()) -> ok.
close_db(undefined) ->
ok;
close_db(Pid) when is_pid(Pid) ->
case is_process_alive(Pid) of
true ->
leveled_bookie:book_close(Pid);
false ->
ok
true -> leveled_bookie:book_close(Pid);
false -> ok
end.

-spec put_object_specs(
pid(),
[{add|remove, v1,
{binary(), binary()}|binary(), binary(), null,
undefined,
binary()|null}]) -> ok.
put_object_specs(DB, ObjectSpecs) ->
case leveled_bookie:book_mput(DB, ObjectSpecs) of
ok -> ok;
pause -> timer:sleep(?PAUSE_ON_BUSY)
end.

%%%===================================================================
%%% EUnit
%%%===================================================================


fake_close(DB) ->
catch leveled_bookie:book_close(DB).

0 comments on commit 7b47e52

Please sign in to comment.