diff --git a/src/hashtree.erl b/src/hashtree.erl index 00c73b089..234902dc8 100644 --- a/src/hashtree.erl +++ b/src/hashtree.erl @@ -880,16 +880,12 @@ hashes(State, Segments) -> -spec snapshot(hashtree()) -> hashtree(). snapshot(State) -> Mod = State#state.database_mod, - ?LOG_INFO( - "Update snapshot - db ~w snapshot ~w", - [State#state.ref, State#state.itr]), {ok, Itr} = Mod:snapshot(State#state.ref, State#state.itr), State#state{itr=Itr}. -spec multi_select_segment( hashtree(), list('*'|integer()), select_fun(T)) -> [{integer(), T}]. multi_select_segment(#state{id=Id, itr=Itr, database_mod=Mod}, Segments, F) -> - ?LOG_INFO("Multi select segment on snapshot ~w", [Itr]), Mod:multi_select_segment(Id, Itr, Segments, F). diff --git a/src/hashtree_leveled.erl b/src/hashtree_leveled.erl index 20a3edb3c..484890c21 100644 --- a/src/hashtree_leveled.erl +++ b/src/hashtree_leveled.erl @@ -34,14 +34,21 @@ encode_key/1 ]). - -export([fake_close/1]). --type tree_id_bin() :: <<_:176>>. -type db_key() :: {binary(), binary()}|{{binary(), binary()}, binary()}. -type select_fun(T) :: fun((orddict:orddict()) -> T). +-type active_store() :: pid(). +-type active_snap() :: pid(). +-type store_ref() :: active_store()|undefined. +-type snapshot_ref() :: active_snap()|undefined. -define(HEAD_TAG, h). +-define(STORE_FORCED_LOGS, []). +-define(STORE_LOG_LEVEL, warn). +-define(SNAP_FORCED_LOGS, []). +-define(SNAP_LOG_LEVEL, warn). +-define(PAUSE_ON_BUSY, 40). -include_lib("kernel/include/logger.hrl"). @@ -53,18 +60,17 @@ %%% API %%%=================================================================== --spec new(proplists:proplist()) -> {term(), string()}. +-spec new(proplists:proplist()) -> {active_store(), string()}. new(Options) -> DataDir = hashtree:get_path(Options), - LeveledOpts = [ {root_path, DataDir}, {max_journalobjectcount, 20000}, %% one tenth of standard size - as head_only - {log_level, warn}, - {forced_logs, [b0001, b0002, b0003, i0023, p0042]}, - {database_id, 0}, + {log_level, ?STORE_LOG_LEVEL}, + {forced_logs, ?STORE_FORCED_LOGS}, + {database_id, 65534}, % 65535 used for tictac aae stores {head_only, with_lookup}, {cache_size, 2000}, {sync_strategy, none}, @@ -76,12 +82,11 @@ new(Options) -> %% question over whether in an idle cluster there may be a need for a %% non-finite timeout ], - ok = filelib:ensure_dir(DataDir), {ok, DB} = leveled_bookie:book_start(LeveledOpts), {DB, DataDir}. --spec close(term(), term()) -> ok. +-spec close(store_ref(), snapshot_ref()) -> ok. close(undefined, undefined) -> ok; close(DB, undefined) -> @@ -96,21 +101,20 @@ close(DB, Snapshot) -> %% store is a shared (linked) store it might be it will be closed prior to a %% snapshot in another pair on the same store. In this case the store will %% wait for 20s to close to give a chance for the snapshot to clear. --spec close_group(list({term(), term()})) -> ok. +-spec close_group(list({store_ref(), snapshot_ref()})) -> ok. close_group(DBList) -> {DBs, Snapshots} = lists:unzip(DBList), lists:foreach( fun(DB) -> close_db(DB) end, lists:usort(Snapshots) ++ lists:usort(DBs)). - -spec destroy(string()) -> ok. destroy(Path) -> hashtree:destroy(Path). -spec encode_key( - {segment, tree_id_bin(), integer(), binary()}| - {bucket, tree_id_bin(), integer(), integer()}| + {segment, hashtree:tree_id_bin(), integer(), binary()}| + {bucket, hashtree:tree_id_bin(), integer(), integer()}| {meta, binary()}) -> db_key(). encode_key({segment, TreeId, Segment, Key}) -> {{<<$t, TreeId:22/binary>>, <>}, <>}; @@ -119,37 +123,35 @@ encode_key({bucket, TreeId, Level, Bucket}) -> encode_key({meta, Key}) -> {<<$m>>, <>}. - -spec snapshot(term(), term()) -> {ok, term()}. snapshot(DB, undefined) -> {ok, Snapshot} = leveled_bookie:book_start([{snapshot_bookie, DB}]), - ok = leveled_bookie:book_loglevel(Snapshot, warn), - ok = - leveled_bookie:book_addlogs( - Snapshot, [b0001, b0002, b0003, i0027, p0007] - ), + ok = leveled_bookie:book_loglevel(Snapshot, ?SNAP_LOG_LEVEL), + ok = leveled_bookie:book_addlogs(Snapshot, ?SNAP_FORCED_LOGS), {ok, Snapshot}; snapshot(DB, Snapshot) -> close_db(Snapshot), snapshot(DB, undefined). --spec get(term(), db_key()) -> {ok, binary()}| not_found | {error, any()}. +-spec get( + active_store()|active_snap(), db_key()) + -> {ok, binary()}| not_found | {error, any()}. get(DB, {Bucket, Key}) -> leveled_bookie:book_headonly(DB, Bucket, Key, null). --spec put(term(), db_key(), binary()) -> ok. +-spec put(store_ref(), db_key(), binary()) -> ok. put(DB, {Bucket, Key}, Value) -> - leveled_bookie:book_mput( - DB, [{add, v1, Bucket, Key, null, undefined, Value}]). + put_object_specs(DB, [{add, v1, Bucket, Key, null, undefined, Value}]). --spec mput(term(), list({put, db_key(), binary()}|{delete, db_key()})) -> ok. +-spec mput( + active_store(), list({put, db_key(), binary()}|{delete, db_key()})) + -> ok. mput(DB, Updates) -> %% Buffer has been built backwards and reversed %% ... so most recent updates are now at the tail of the list %% e.g. [FirstUpdate, SecondUpdate ..., NthUpdate] %% Need to de-duplicate this, so only the most recent change is added for - %% each key - so reverse before ukeysort. - %% Order expected for leveled is + %% each key - so reverse before ukeysort. Order expected for leveled is: %% [NthUpdate, ..., SecondUpdate, FirstUpdate] - so don't re-reverse ObjectSpecs = lists:map( @@ -163,14 +165,13 @@ mput(DB, Updates) -> end, lists:ukeysort(2, lists:reverse(Updates)) ), - leveled_bookie:book_mput(DB, ObjectSpecs). + put_object_specs(DB, ObjectSpecs). --spec delete(term(), db_key()) -> ok. +-spec delete(active_store(), db_key()) -> ok. delete(DB, {Bucket, Key}) -> - leveled_bookie:book_mput( - DB, [{remove, v1, Bucket, Key, null, undefined, null}]). + put_object_specs(DB, [{remove, v1, Bucket, Key, null, undefined, null}]). --spec clear_buckets(tree_id_bin(), term()) -> ok. +-spec clear_buckets(hashtree:tree_id_bin(), active_store()) -> ok. clear_buckets(Id, DB) -> FoldFun = fun(Bucket, {Key, null}, Acc) -> @@ -184,13 +185,13 @@ clear_buckets(Id, DB) -> {FoldFun, []} ), BucketKeyList = BucketFolder(), - leveled_bookie:book_mput(DB, BucketKeyList), + put_object_specs(DB, BucketKeyList), ?LOG_DEBUG("Tree ~p cleared ~p segments.\n", [Id, length(BucketKeyList)]), ok. - -spec multi_select_segment( - term(), term(), list('*'|integer()), select_fun(T)) -> [{integer(), T}]. + hashtree:tree_id_bin(), active_snap(), list('*'|integer()), select_fun(T)) + -> [{integer(), T}]. multi_select_segment(Id, Itr, Segments, F) -> DBType = element(1, element(1, encode_key({segment, Id, 0, <<>>}))), @@ -200,12 +201,9 @@ multi_select_segment(Id, Itr, Segments, F) -> {DBType, <>} -> NewEntry = {hashtree:external_encode(Id, Seg, Key), Value}, case Acc of - [] -> - [{Seg, [NewEntry]}]; - [{Seg, KVL}|T] -> - [{Seg, [NewEntry|KVL]}|T]; - Acc -> - [{Seg, [NewEntry]}|Acc] + [] -> [{Seg, [NewEntry]}]; + [{Seg, KVL}|T] -> [{Seg, [NewEntry|KVL]}|T]; + Acc -> [{Seg, [NewEntry]}|Acc] end; _ -> Acc @@ -215,9 +213,7 @@ multi_select_segment(Id, Itr, Segments, F) -> case Segments of ['*', '*'] -> leveled_bookie:book_headfold( - Itr, ?HEAD_TAG, {FoldFun, []}, - false, false, false - ); + Itr, ?HEAD_TAG, {FoldFun, []}, false, false, false); Segments -> BList = lists:map( @@ -234,33 +230,37 @@ multi_select_segment(Id, Itr, Segments, F) -> SegKeyValues = Folder(), Result = lists:map( - fun({S, KVL}) -> {S, F(lists:reverse(KVL))} end, - SegKeyValues - ), + fun({S, KVL}) -> {S, F(lists:reverse(KVL))} end, SegKeyValues), lists:reverse(Result). - %%%=================================================================== %%% Internal functions %%%=================================================================== - --spec close_db(pid()|undefined) -> ok. +-spec close_db(store_ref()) -> ok. close_db(undefined) -> ok; close_db(Pid) when is_pid(Pid) -> case is_process_alive(Pid) of - true -> - leveled_bookie:book_close(Pid); - false -> - ok + true -> leveled_bookie:book_close(Pid); + false -> ok end. +-spec put_object_specs( + pid(), + [{add|remove, v1, + {binary(), binary()}|binary(), binary(), null, + undefined, + binary()|null}]) -> ok. +put_object_specs(DB, ObjectSpecs) -> + case leveled_bookie:book_mput(DB, ObjectSpecs) of + ok -> ok; + pause -> timer:sleep(?PAUSE_ON_BUSY) + end. %%%=================================================================== %%% EUnit %%%=================================================================== - fake_close(DB) -> catch leveled_bookie:book_close(DB). \ No newline at end of file