Skip to content

Commit

Permalink
Merge pull request #96 from basho-labs/dp-multiple-vsns
Browse files Browse the repository at this point in the history
Support of multiple Riak versions. Each cluster may be created with it is own version of Riak.
  • Loading branch information
sanmiguel authored Nov 17, 2016
2 parents 7f6c1cb + 4a6926f commit df51f30
Show file tree
Hide file tree
Showing 9 changed files with 407 additions and 277 deletions.
28 changes: 0 additions & 28 deletions rel/files/ermf-scheduler.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,13 @@ main() {
if [ -z "$RIAK_MESOS_PORT" ]; then
export RIAK_MESOS_PORT=$PORT0
fi
if [ -z "$RIAK_MESOS_SCHEDULER_PKG" ]; then
export RIAK_MESOS_SCHEDULER_PKG=riak_mesos_scheduler.tar.gz
fi
if [ -z "$RIAK_MESOS_EXECUTOR_PKG" ]; then
export RIAK_MESOS_EXECUTOR_PKG=riak_mesos_executor.tar.gz
fi
if [ -z "$RIAK_MESOS_EXPLORER_PKG" ]; then
export RIAK_MESOS_EXPLORER_PKG=riak_explorer.tar.gz
fi
if [ -z "$RIAK_MESOS_RIAK_PKG" ]; then
export RIAK_MESOS_RIAK_PKG=riak.tar.gz
fi
if [ -z "$RIAK_MESOS_PATCHES_PKG" ]; then
export RIAK_MESOS_PATCHES_PKG=riak_erlpmd_patches.tar.gz
fi

if [ -z "$RIAK_MESOS_NAME" ]; then
export RIAK_MESOS_NAME=riak
fi

mkdir -p artifacts
mv $RIAK_MESOS_RIAK_PKG artifacts/$RIAK_MESOS_RIAK_PKG &> /dev/null
mv $RIAK_MESOS_EXPLORER_PKG artifacts/$RIAK_MESOS_EXPLORER_PKG &> /dev/null
mv $RIAK_MESOS_EXECUTOR_PKG artifacts/$RIAK_MESOS_EXECUTOR_PKG &> /dev/null
mv $RIAK_MESOS_PATCHES_PKG artifacts/$RIAK_MESOS_PATCHES_PKG &> /dev/null
rm $RIAK_MESOS_SCHEDULER_PKG
rm -rf root
rm -rf riak_mesos_executor

RUN_SCHEDULER="riak_mesos_scheduler/bin/riak_mesos_scheduler -noinput"

if [ -n "$RIAK_MESOS_ATTACH" ]; then
RUN_SCHEDULER="$RUN_SCHEDULER -sname '$RIAK_MESOS_NAME' -setcookie '$RIAK_MESOS_NAME'"
fi

echo "Starting riak_mesos_scheduler..."
eval "$RUN_SCHEDULER"
}
Expand Down
36 changes: 18 additions & 18 deletions src/rms_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
-behaviour(gen_fsm).

%% API
-export([start_link/1]).
-export([start_link/2]).
-export([get/1,
get/2,
get_field_value/2,
Expand All @@ -46,20 +46,19 @@
terminate/3,
code_change/4]).

-export([
requested/2,
-export([requested/2,
requested/3,
running/2,
running/3,
restarting/2,
restarting/3,
shutdown/2,
shutdown/3
]).
shutdown/3]).

-record(cluster, {key :: rms_cluster:key(),
riak_config = <<>> :: binary(),
advanced_config = <<>> :: binary(),
riak_version = "" :: string(),
riak_config = undefined :: undefined | string(),
advanced_config = undefined :: undefined | string(),
generation = 1 :: pos_integer(),
to_restart = {[], []} :: {list(rms_node:key()), list(rms_node:key())}}).

Expand All @@ -78,10 +77,9 @@

%%% API

-spec start_link(key()) ->
{ok, pid()} | {error, term()}.
start_link(Key) ->
gen_fsm:start_link(?MODULE, Key, []).
-spec start_link(key(), string()) -> {ok, pid()} | {error, term()}.
start_link(Key, RiakVersion) ->
gen_fsm:start_link(?MODULE, {Key, RiakVersion}, []).

-spec get(key()) -> {ok, rms_metadata:cluster_state()} | {error, term()}.
get(Key) ->
Expand All @@ -103,8 +101,8 @@ get_field_value(Field, Key) ->
case rms_metadata:get_cluster(Key) of
{ok, Cluster} ->
case proplists:get_value(Field, Cluster, field_not_found) of
field_not_found ->
{error, field_not_found};
field_not_found = Reason ->
{error, Reason};
Value ->
{ok, Value}
end;
Expand Down Expand Up @@ -169,15 +167,14 @@ node_stopped(Pid, NodeKey) ->
| {reply, reply(), Next::state(), New::cluster_state()}
| {reply, reply(), Next::state(), New::cluster_state(), state_timeout()}.

-spec init(key()) ->
{ok, state(), cluster_state()}
| {stop, reason()}.
init(Key) ->
-spec init({key(), string()}) ->
{ok, state(), cluster_state()} | {stop, reason()}.
init({Key, RiakVersion}) ->
case get_cluster(Key) of
{ok, {State, Cluster}} ->
{ok, State, Cluster};
{error, not_found} ->
Cluster = #cluster{key = Key},
Cluster = #cluster{key = Key, riak_version = RiakVersion},
case add_cluster({running, Cluster}) of
ok ->
{ok, running, Cluster};
Expand Down Expand Up @@ -507,6 +504,7 @@ next_timeout(shutdown, #cluster{}=_Cluster) ->
from_list(ClusterList) ->
{proplists:get_value(status, ClusterList),
#cluster{key = proplists:get_value(key, ClusterList),
riak_version = proplists:get_value(riak_version, ClusterList),
riak_config = proplists:get_value(riak_config, ClusterList),
advanced_config = proplists:get_value(advanced_config,
ClusterList),
Expand All @@ -516,12 +514,14 @@ from_list(ClusterList) ->
-spec to_list({atom(), cluster_state()}) -> rms_metadata:cluster_state().
to_list({State,
#cluster{key = Key,
riak_version = RiakVersion,
riak_config = RiakConf,
advanced_config = AdvancedConfig,
to_restart = ToRestart,
generation = Generation}}) ->
[{key, Key},
{status, State},
{riak_version, RiakVersion},
{riak_config, RiakConf},
{advanced_config, AdvancedConfig},
{to_restart, ToRestart},
Expand Down
39 changes: 23 additions & 16 deletions src/rms_cluster_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
-export([get_cluster_keys/0,
get_cluster/1,
get_cluster/2,
get_cluster_riak_version/1,
get_cluster_riak_config/1,
get_cluster_advanced_config/1,
add_cluster/1,
add_cluster/2,
add_cluster_with_nodes/2,
set_cluster_riak_config/2,
set_cluster_advanced_config/2,
restart_cluster/1,
Expand Down Expand Up @@ -78,6 +79,11 @@ get_cluster(Key) ->
get_cluster(Key, Fields) ->
rms_cluster:get(Key, Fields).

-spec get_cluster_riak_version(rms_cluster:key()) ->
{ok, string()} | {error, term()}.
get_cluster_riak_version(Key) ->
rms_cluster:get_field_value(riak_version, Key).

-spec get_cluster_riak_config(rms_cluster:key()) ->
{ok, binary()} | {error, term()}.
get_cluster_riak_config(Key) ->
Expand All @@ -88,13 +94,13 @@ get_cluster_riak_config(Key) ->
get_cluster_advanced_config(Key) ->
rms_cluster:get_field_value(advanced_config, Key).

-spec add_cluster(rms_cluster:key()) -> ok | {error, term()}.
add_cluster(Key) ->
-spec add_cluster(rms_cluster:key(), string()) -> ok | {error, term()}.
add_cluster(Key, RiakVersion) ->
case get_cluster(Key) of
{ok, _Cluster} ->
{error, exists};
{error, not_found} ->
ClusterSpec = cluster_spec(Key),
ClusterSpec = cluster_spec(Key, RiakVersion),
case supervisor:start_child(?MODULE, ClusterSpec) of
{ok, _Pid} ->
ok;
Expand All @@ -110,19 +116,20 @@ add_cluster(Key) ->
end
end.

-spec add_cluster(rms_metadata:cluster_state(), [rms_metadata:node_state()]) ->
-spec add_cluster_with_nodes(rms_metadata:cluster_state(),
[rms_metadata:node_state()]) ->
ok | {error, term()}.
add_cluster(Cluster, Nodes) ->
add_cluster_with_nodes(Cluster, Nodes) ->
Key = proplists:get_value(key, Cluster),
case rms_cluster_manager:add_cluster(Key) of
RiakVersion = proplists:get_value(riak_version, Cluster),
case add_cluster(Key, RiakVersion) of
ok ->
{ok, Pid} = get_cluster_pid(Key),
RiakConfig = proplists:get_value(riak_config, Cluster),
AdvancedConfig = proplists:get_value(advanced_config, Cluster),
Generation = proplists:get_value(generation, Cluster),
ok = rms_cluster_manager:set_cluster_riak_config(Key, RiakConfig),
ok = rms_cluster_manager:set_cluster_advanced_config(Key,
AdvancedConfig),
ok = set_cluster_riak_config(Key, RiakConfig),
ok = set_cluster_advanced_config(Key, AdvancedConfig),
ok = rms_cluster:set_generation(Pid, Generation),
[begin
NodeKey = proplists:get_value(key, Node),
Expand Down Expand Up @@ -215,7 +222,7 @@ executors_to_shutdown([NodeKey|Rest], Accum) ->
{ok, true} ->
{ok, AgentIdValue} = rms_node_manager:get_node_agent_id_value(NodeKey),
{ok, ExecutorIdValue} = rms_node_manager:get_node_executor_id_value(NodeKey),
executors_to_shutdown(Rest, [{ExecutorIdValue, AgentIdValue}|Accum]);
executors_to_shutdown(Rest, [{ExecutorIdValue, AgentIdValue} | Accum]);
{ok, false} ->
executors_to_shutdown(Rest, Accum)
end.
Expand Down Expand Up @@ -285,8 +292,8 @@ apply_offer(OfferHelper) ->
-spec init({}) ->
{ok, {{supervisor:strategy(), 1, 1}, [supervisor:child_spec()]}}.
init({}) ->
Specs = [cluster_spec(Key) ||
{Key, _Cluster} <- rms_metadata:get_clusters()],
Specs = [cluster_spec(Key, proplists:get_value(riak_version, Cluster)) ||
{Key, Cluster} <- rms_metadata:get_clusters()],
{ok, {{one_for_one, 1, 1}, Specs}}.

%% Internal functions.
Expand Down Expand Up @@ -423,10 +430,10 @@ unreserve_volumes(OfferHelper) ->
OfferHelper
end.

-spec cluster_spec(rms_cluster:key()) -> supervisor:child_spec().
cluster_spec(Key) ->
-spec cluster_spec(rms_cluster:key(), string()) -> supervisor:child_spec().
cluster_spec(Key, RiakVersion) ->
{Key,
{rms_cluster, start_link, [Key]},
{rms_cluster, start_link, [Key, RiakVersion]},
transient, 5000, worker, [rms_cluster]}.

-spec get_cluster_pid(rms_cluster:key()) -> {ok, pid()} | {error, not_found}.
Expand Down
Loading

0 comments on commit df51f30

Please sign in to comment.