Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/develop-3.4' into mas-o34-upst…
Browse files Browse the repository at this point in the history
…ream.d34
  • Loading branch information
martinsumner committed Dec 3, 2024
2 parents 9e89580 + fb1f56d commit 9a83ef1
Show file tree
Hide file tree
Showing 12 changed files with 782 additions and 680 deletions.
Binary file modified rebar3
Binary file not shown.
299 changes: 142 additions & 157 deletions src/aae_controller.erl

Large diffs are not rendered by default.

273 changes: 127 additions & 146 deletions src/aae_exchange.erl

Large diffs are not rendered by default.

298 changes: 159 additions & 139 deletions src/aae_keystore.erl

Large diffs are not rendered by default.

40 changes: 7 additions & 33 deletions src/aae_runner.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
query_count = 0 :: integer(),
query_time = 0 :: integer(),
aae_controller :: pid()|undefined,
log_levels :: aae_util:log_levels()|undefined}).
log_levels :: aae_util:log_levels()}).

-define(LOG_FREQUENCY, 10).

Expand All @@ -33,7 +33,7 @@
%%%============================================================================


-spec runner_start(aae_util:log_levels()|undefined) -> {ok, pid()}.
-spec runner_start(aae_util:log_levels()) -> {ok, pid()}.
%% @doc
%% Start an AAE runner to manage folds
runner_start(LogLevels) ->
Expand Down Expand Up @@ -70,13 +70,12 @@ handle_cast({work, Folder, ReturnFun, SizeFun}, State) ->
State0 =
try Folder() of
query_backlog ->
aae_util:log("R0002", [], logs(), State#state.log_levels),
aae_util:log(r0002, [], State#state.log_levels),
ReturnFun({error, query_backlog}),
State;
Results ->
QueryTime = timer:now_diff(os:timestamp(), SW),
aae_util:log("R0003", [QueryTime], logs(),
State#state.log_levels),
aae_util:log(r0003, [QueryTime], State#state.log_levels),
RS0 = State#state.result_size + SizeFun(Results),
QT0 = State#state.query_time + QueryTime,
QC0 = State#state.query_count + 1,
Expand All @@ -91,15 +90,14 @@ handle_cast({work, Folder, ReturnFun, SizeFun}, State) ->
query_count = QC1}
catch
Error:Pattern ->
aae_util:log("R0005", [Error, Pattern], logs(),
State#state.log_levels),
aae_util:log(r0005, [Error, Pattern], State#state.log_levels),
ReturnFun({error, Error}),
State
end,
{noreply, State0, 0}.

handle_info(timeout, State) ->
aae_util:log("R0004", [], logs(), State#state.log_levels),
aae_util:log(r0004, [], State#state.log_levels),
ok = aae_controller:aae_runnerprompt(State#state.aae_controller),
{noreply, State}.

Expand All @@ -121,33 +119,9 @@ code_change(_OldVsn, State, _Extra) ->
maybe_log(RS_Acc, QT_Acc, QC_Acc, LogFreq, _LogLs) when QC_Acc < LogFreq ->
{RS_Acc, QT_Acc, QC_Acc};
maybe_log(RS_Acc, QT_Acc, QC_Acc, _LogFreq, LogLs) ->
aae_util:log("R0001", [RS_Acc, QT_Acc, QC_Acc], logs(), LogLs),
aae_util:log(r0001, [RS_Acc, QT_Acc, QC_Acc], LogLs),
{0, 0, 0}.


%%%============================================================================
%%% log definitions
%%%============================================================================

-spec logs() -> list(tuple()).
%% @doc
%% Define log lines for this module
logs() ->
[{"R0001",
{info, "AAE fetch clock runner has seen results=~w " ++
"query_time=~w for a query_count=~w queries"}},
{"R0002",
{info, "Query backlog resulted in dummy fold"}},
{"R0003",
{debug, "Query complete in time ~w"}},
{"R0004",
{debug, "Prompting controller"}},
{"R0005",
{warn, "Query lead to error ~w pattern ~w"}}

].


%%%============================================================================
%%% Test
%%%============================================================================
Expand Down
112 changes: 51 additions & 61 deletions src/aae_treecache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

-behaviour(gen_server).

-include("aae.hrl").
-include("include/aae.hrl").

-export([
init/1,
Expand All @@ -14,7 +14,7 @@
handle_info/2,
terminate/2,
code_change/3,
format_status/2]).
format_status/1]).

-export([cache_open/3,
cache_new/3,
Expand Down Expand Up @@ -42,9 +42,9 @@
loading = false :: boolean(),
dirty_segments = [] :: list(),
active_fold :: string()|undefined,
change_queue = [] :: list()|not_logged,
change_queue = [] :: list()|redacted,
queued_changes = 0 :: non_neg_integer(),
log_levels :: aae_util:log_levels()|undefined,
log_levels :: aae_util:log_levels(),
safe_save = false :: boolean()}).

-type partition_id() :: integer()|{integer(), integer()}.
Expand All @@ -54,8 +54,8 @@
%%% API
%%%============================================================================

-spec cache_open(list(), partition_id(), aae_util:log_levels()|undefined)
-> {boolean(), pid()}.
-spec cache_open(
list(), partition_id(), aae_util:log_levels()) -> {boolean(), pid()}.
%% @doc
%% Open a tree cache, using any previously saved one for this tree cache as a
%% starting point. Return is_empty boolean as true to indicate if a new cache
Expand All @@ -68,8 +68,8 @@ cache_open(RootPath, PartitionID, LogLevels) ->
IsRestored = gen_server:call(Pid, is_restored, infinity),
{IsRestored, Pid}.

-spec cache_new(list(), partition_id(), aae_util:log_levels()|undefined)
-> {ok, pid()}.
-spec cache_new(
list(), partition_id(), aae_util:log_levels()) -> {ok, pid()}.
%% @doc
%% Open a tree cache, without restoring from file
cache_new(RootPath, PartitionID, LogLevels) ->
Expand Down Expand Up @@ -179,7 +179,7 @@ init([Opts]) ->
?START_SQN,
false}
end,
aae_util:log("C0005", [IsRestored, PartitionID], logs(), LogLevels),
aae_util:log(c0005, [IsRestored, PartitionID], LogLevels),
process_flag(trap_exit, true),
{ok,
#state{
Expand Down Expand Up @@ -258,10 +258,9 @@ handle_cast({complete_load, Tree}, State=#state{loading=Loading})
AccTree, Key, {CH, OH}, fun binary_extractfun/2)
end,
Tree0 = lists:foldr(LoadFun, Tree, State#state.change_queue),
aae_util:log("C0008",
[length(State#state.change_queue)],
logs(),
State#state.log_levels),
aae_util:log(
c0008, [length(State#state.change_queue)], State#state.log_levels
),
{noreply,
State#state{
loading = false,
Expand All @@ -286,10 +285,11 @@ handle_cast({replace_dirtysegments, SegmentMap, FoldGUID}, State) ->
fun({SegID, NewHash}, TreeAcc) ->
case lists:member(SegID, State#state.dirty_segments) of
true ->
aae_util:log("C0006",
[State#state.partition_id, SegID, NewHash],
logs(),
State#state.log_levels),
aae_util:log(
c0006,
[State#state.partition_id, SegID, NewHash],
State#state.log_levels
),
leveled_tictac:alter_segment(SegID, NewHash, TreeAcc);
false ->
TreeAcc
Expand All @@ -306,10 +306,7 @@ handle_cast({replace_dirtysegments, SegmentMap, FoldGUID}, State) ->
{noreply, State}
end;
handle_cast(destroy, State) ->
aae_util:log("C0004",
[State#state.partition_id],
logs(),
State#state.log_levels),
aae_util:log(c0004, [State#state.partition_id], State#state.log_levels),
{stop, normal, State};
handle_cast({log_levels, LogLevels}, State) ->
{noreply, State#state{log_levels = LogLevels}}.
Expand All @@ -318,10 +315,18 @@ handle_cast({log_levels, LogLevels}, State) ->
handle_info(_Info, State) ->
{stop, normal, State}.

format_status(normal, [_PDict, State]) ->
State;
format_status(terminate, [_PDict, State]) ->
State#state{change_queue = not_logged}.
format_status(Status) ->
case maps:get(reason, Status, normal) of
terminate ->
State = maps:get(state, Status),
maps:update(
state,
State#state{change_queue = redacted},
Status
);
_ ->
Status
end.


terminate(_Reason, _State) ->
Expand All @@ -343,10 +348,9 @@ flatten_id({Index, N}) ->
flatten_id(ID) ->
integer_to_list(ID).

-spec save_to_disk(list(),
integer(),
leveled_tictac:tictactree(),
aae_util:log_levels()|undefined) -> ok.
-spec save_to_disk(
list(), integer(), leveled_tictac:tictactree(), aae_util:log_levels())
-> ok.
%% @doc
%% Save the TreeCache to disk, with a checksum so thatit can be
%% validated on read.
Expand All @@ -355,15 +359,16 @@ save_to_disk(RootPath, SaveSQN, TreeCache, LogLevels) ->
CRC32 = erlang:crc32(Serialised),
ok = filelib:ensure_dir(RootPath),
PendingName = integer_to_list(SaveSQN) ++ ?PENDING_EXT,
aae_util:log("C0003", [RootPath, PendingName], logs(), LogLevels),
aae_util:log(c0003, [RootPath, PendingName], LogLevels),
ok = file:write_file(filename:join(RootPath, PendingName),
<<CRC32:32/integer, Serialised/binary>>,
[raw]),
file:rename(filename:join(RootPath, PendingName),
form_cache_filename(RootPath, SaveSQN)).

-spec open_from_disk(list(), aae_util:log_levels()|undefined)
-> {leveled_tictac:tictactree()|none, integer()}.
-spec open_from_disk(
list(), aae_util:log_levels())
-> {leveled_tictac:tictactree()|none, integer()}.
%% @doc
%% Open most recently saved TicTac tree cache file on disk, deleting all
%% others both used and unused - to save an out of date tree from being used
Expand All @@ -375,7 +380,7 @@ open_from_disk(RootPath, LogLevels) ->
fun(FN, FinalFiles) ->
case filename:extension(FN) of
?PENDING_EXT ->
aae_util:log("C0001", [FN], logs(), LogLevels),
aae_util:log(c0001, [FN], LogLevels),
ok = file:delete(filename:join(RootPath, FN)),
FinalFiles;
?FINAL_EXT ->
Expand Down Expand Up @@ -404,7 +409,7 @@ open_from_disk(RootPath, LogLevels) ->
{leveled_tictac:import_tree(binary_to_term(STC)),
HeadSQN + 1};
{error, Reason} ->
aae_util:log("C0002", [FileToUse, Reason], logs(), LogLevels),
aae_util:log(c0002, [FileToUse, Reason], LogLevels),
{none, 1}
end
end.
Expand Down Expand Up @@ -445,24 +450,6 @@ binary_extractfun(Key, {CurrentHash, OldHash}) ->
end,
{Key, {is_hash, CurrentHash bxor RemoveH}}.

%%%============================================================================
%%% log definitions
%%%============================================================================

-spec logs() -> list(tuple()).
%% @doc
%% Define log lines for this module
logs() ->
[{"C0001", {info, "Pending filename ~s found and will delete"}},
{"C0002", {warn, "File ~w opened with error=~w so will be ignored"}},
{"C0003", {info, "Saving tree cache to path ~s and filename ~s"}},
{"C0004", {info, "Destroying tree cache for partition ~w"}},
{"C0005", {info, "Starting cache with is_restored=~w and IndexN of ~w"}},
{"C0006", {debug, "Altering segment for PartitionID=~w ID=~w Hash=~w"}},
{"C0007", {warn, "Treecache exiting after trapping exit from Pid=~w"}},
{"C0008", {info, "Complete load of tree with length of change_queue=~w"}},
{"C0009", {info, "During cache rebuild reached length of change_queue=~w"}}].

%%%============================================================================
%%% Test
%%%============================================================================
Expand Down Expand Up @@ -601,16 +588,19 @@ corrupt_save_tester() ->
aae_util:clean_subdir(RootPath).

format_status_test() ->
RootPath = "test/foratstatus/",
RootPath = "test/formatstatus/",
PartitionID = 99,
aae_util:clean_subdir(RootPath ++ "/" ++ integer_to_list(PartitionID)),
{ok, C0} = cache_new(RootPath, PartitionID, undefined),
{status, C0, {module, gen_server}, SItemL} = sys:get_status(C0),
S = lists:keyfind(state, 1, lists:nth(5, SItemL)),
{status, _C0, {module, gen_server}, SItemL} = sys:get_status(C0),
{data,[{"State", S}]} = lists:nth(3, lists:nth(5, SItemL)),
?assert(is_list(S#state.change_queue)),
ST = format_status(terminate, [dict:new(), S]),
?assertMatch(not_logged, ST#state.change_queue),

RedactedStatus = format_status(#{reason => terminate, state => S}),
RST = maps:get(state, RedactedStatus),
?assertMatch(redacted, RST#state.change_queue),
NormStatus = format_status(#{reason => normal, state => S}),
NST = maps:get(state, NormStatus),
?assert(is_list(NST#state.change_queue)),
ok = cache_destroy(C0).

simple_test() ->
Expand Down Expand Up @@ -735,7 +725,7 @@ dirty_segment_test() ->
AddFun =
fun(I) ->
K = integer_to_binary(I),
H = erlang:phash2(leveled_rand:uniform(100000)),
H = erlang:phash2(rand:uniform(100000)),
cache_alter(AAECache0, K, H, 0)
end,

Expand All @@ -756,8 +746,8 @@ dirty_segment_test() ->

?assertMatch(false, Leaf0 == 0),

H1 = erlang:phash2(leveled_rand:uniform(100000)),
H2 = erlang:phash2(leveled_rand:uniform(100000)),
H1 = erlang:phash2(rand:uniform(100000)),
H2 = erlang:phash2(rand:uniform(100000)),
{_HK1, TTH1} = leveled_tictac:tictac_hash(K1, {is_hash, H1}),
{_HK2, TTH2} = leveled_tictac:tictac_hash(K2, {is_hash, H2}),

Expand Down
Loading

0 comments on commit 9a83ef1

Please sign in to comment.