Skip to content

Commit

Permalink
Pass in leveled configuration items at startup (martinsumner#121)
Browse files Browse the repository at this point in the history
* Pass in leveled configuration items at startup

Not implemented the full set of options, just a few obviously relevant ones.

Will require a change in riak_kv_vnode to allow for the config to be passed through.

* Point to openriak
  • Loading branch information
martinsumner authored Dec 2, 2024
1 parent c454831 commit 778ec09
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 146 deletions.
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@
{ct_opts, [{dir, ["test/end_to_end"]}]}.

{deps, [
{leveled, ".*", {git, "https://github.com/nhs-riak/leveled", {branch, "nhse-develop"}}}
{leveled, ".*", {git, "https://github.com/OpenRiak/leveled", {branch, "openriak-3.2"}}}
]}.
151 changes: 87 additions & 64 deletions src/aae_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

-export([aae_start/6,
aae_start/7,
aae_start/8,
aae_nextrebuild/1,
aae_put/7,
aae_close/1,
Expand All @@ -44,7 +45,8 @@
aae_bucketlist/1,
aae_loglevel/2,
aae_ping/3,
aae_runnerprompt/1]).
aae_runnerprompt/1
]).

-export([foldobjects_buildtrees/2,
hash_clocks/2,
Expand All @@ -68,7 +70,6 @@
% May depend on x2 underlying 30s timeout
-define(MAX_RUNNER_QUEUEDEPTH, 4).


-record(state, {key_store :: pid()|undefined,
tree_caches = [] :: tree_caches(),
index_ns = [] :: list(responsible_preflist()),
Expand All @@ -87,14 +88,19 @@
queue_backlog = false :: boolean(),
block_next_put = false :: boolean()}).

-record(options, {keystore_type :: keystore_type(),
store_isempty :: boolean(),
rebuild_schedule :: rebuild_schedule(),
index_ns :: list(responsible_preflist()),
object_splitfun,
root_path :: list(),
log_levels :: aae_util:log_levels()|undefined}).

-record(options,
{
keystore_type :: keystore_type(),
store_isempty :: boolean(),
rebuild_schedule :: rebuild_schedule(),
index_ns :: list(responsible_preflist()),
object_splitfun,
root_path :: list(),
log_levels :: aae_util:log_levels()|undefined,
leveled_options :: aae_keystore:leveled_options()
}
).

-type controller_state() :: #state{}.

-type responsible_preflist() :: {integer(), integer()}.
Expand Down Expand Up @@ -156,43 +162,48 @@
%%% API
%%%============================================================================

-spec aae_start(keystore_type(),
boolean(),
rebuild_schedule(),
list(responsible_preflist()),
list(),
object_splitter()) -> {ok, pid()}.
aae_start(KeyStoreT, IsEmpty, RS, PLs, Path, ObjSplitFun) ->
aae_start(
KeyStoreT, IsEmpty, RS, PLs, Path, ObjSplitFun, undefined
).

aae_start(KeyStoreT, IsEmpty, RS, PLs, Path, ObjSplitFun, LogLevels) ->
LeveledOpts = aae_keystore:store_generate_backendoptions(),
aae_start(
KeyStoreT, IsEmpty, RS, PLs, Path, ObjSplitFun, LogLevels, LeveledOpts
).

-spec aae_start(
keystore_type(),
boolean(),
rebuild_schedule(),
list(responsible_preflist()),
list(),
object_splitter(),
aae_util:log_levels()|undefined,
aae_keystore:leveled_options()) -> {ok, pid()}.
%% @doc
%% Start an AAE controller
%% The ObjectsplitFun must take a vnode object in a binary form and output
%% {Size, SibCount, IndexHash, LMD, MD}. If the SplitFun previously outputted
%% {Size, SibCount, IndexHash, null} that output will be converted
aae_start(KeyStoreT, IsEmpty, RebuildSch, Preflists, RootPath, ObjSplitFun) ->
aae_start(KeyStoreT, IsEmpty, RebuildSch,
Preflists, RootPath, ObjSplitFun, undefined).

-spec aae_start(keystore_type(),
boolean(),
rebuild_schedule(),
list(responsible_preflist()),
list(),
object_splitter(),
aae_util:log_levels()|undefined) -> {ok, pid()}.

aae_start(KeyStoreT, IsEmpty, RebuildSch,
Preflists, RootPath, ObjSplitFun, LogLevels) ->
aae_start(
KeyStoreT, IsEmpty, RS, PLs, Path, ObjSplitFun, LogLevels, LeveledOpts
) ->
WrapObjSplitFun = wrapped_splitobjfun(ObjSplitFun),
AAEopts =
#options{keystore_type = KeyStoreT,
store_isempty = IsEmpty,
rebuild_schedule = RebuildSch,
index_ns = Preflists,
root_path = RootPath,
object_splitfun = WrapObjSplitFun,
log_levels = LogLevels},
#options{
keystore_type = KeyStoreT,
store_isempty = IsEmpty,
rebuild_schedule = RS,
index_ns = PLs,
root_path = Path,
object_splitfun = WrapObjSplitFun,
log_levels = LogLevels,
leveled_options = LeveledOpts
},
gen_server:start(?MODULE, [AAEopts], []).


-spec aae_nextrebuild(pid()) -> erlang:timestamp().
%% @doc
%% When is the next keystore rebuild process scheduled for
Expand Down Expand Up @@ -463,6 +474,7 @@ aae_ping(Pid, RequestTime, From) ->
aae_runnerprompt(Pid) ->
gen_server:cast(Pid, runner_prompt).


%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
Expand All @@ -478,45 +490,56 @@ init([Opts]) ->
case Opts#options.keystore_type of
{parallel, StoreType} ->
StoreRP = filename:join([RootPath, StoreType, ?STORE_PATH]),
LeveledOpts = Opts#options.leveled_options,
{ok, {LastRebuild, IsEmpty}, Pid} =
aae_keystore:store_parallelstart(StoreRP,
StoreType,
LogLevels),
aae_keystore:store_parallelstart(
StoreRP, StoreType, LogLevels, LeveledOpts),
case Opts#options.store_isempty of
IsEmpty ->
RebuildTS =
schedule_rebuild(LastRebuild, RebuildSchedule),
{ok, #state{key_store = Pid,
next_rebuild = RebuildTS,
rebuild_schedule = RebuildSchedule,
reliable = true,
parallel_keystore = true}};
{
ok,
#state{
key_store = Pid,
next_rebuild = RebuildTS,
rebuild_schedule = RebuildSchedule,
reliable = true,
parallel_keystore = true
}
};
StoreState ->
aae_util:log("AAE01",
[StoreState, IsEmpty],
logs(),
LogLevels),
{ok, #state{key_store = Pid,
next_rebuild = os:timestamp(),
rebuild_schedule = RebuildSchedule,
reliable = false,
parallel_keystore = true}}
aae_util:log(
"AAE01", [StoreState, IsEmpty], logs(), LogLevels),
{
ok,
#state{
key_store = Pid,
next_rebuild = os:timestamp(),
rebuild_schedule = RebuildSchedule,
reliable = false,
parallel_keystore = true
}
}
end;
{native, StoreType, BackendPid} ->
aae_util:log("AAE02", [StoreType], logs(), LogLevels),
StoreRP = filename:join([RootPath, StoreType, ?STORE_PATH]),
{ok, {LastRebuild, _IsE}, KeyStorePid} =
aae_keystore:store_nativestart(StoreRP,
StoreType,
BackendPid,
LogLevels),
aae_keystore:store_nativestart(
StoreRP, StoreType, BackendPid, LogLevels),
RebuildTS =
schedule_rebuild(LastRebuild, RebuildSchedule),
{ok, #state{key_store = KeyStorePid,
next_rebuild = RebuildTS,
rebuild_schedule = RebuildSchedule,
reliable = true,
parallel_keystore = false}}
{
ok,
#state{
key_store = KeyStorePid,
next_rebuild = RebuildTS,
rebuild_schedule = RebuildSchedule,
reliable = true,
parallel_keystore = false
}
}
end,

% Start the TreeCaches
Expand Down
Loading

0 comments on commit 778ec09

Please sign in to comment.