diff --git a/rel/files/ermf-scheduler.sh b/rel/files/ermf-scheduler.sh index 9ee89b1..618b5c7 100755 --- a/rel/files/ermf-scheduler.sh +++ b/rel/files/ermf-scheduler.sh @@ -18,41 +18,13 @@ main() { if [ -z "$RIAK_MESOS_PORT" ]; then export RIAK_MESOS_PORT=$PORT0 fi - if [ -z "$RIAK_MESOS_SCHEDULER_PKG" ]; then - export RIAK_MESOS_SCHEDULER_PKG=riak_mesos_scheduler.tar.gz - fi - if [ -z "$RIAK_MESOS_EXECUTOR_PKG" ]; then - export RIAK_MESOS_EXECUTOR_PKG=riak_mesos_executor.tar.gz - fi - if [ -z "$RIAK_MESOS_EXPLORER_PKG" ]; then - export RIAK_MESOS_EXPLORER_PKG=riak_explorer.tar.gz - fi - if [ -z "$RIAK_MESOS_RIAK_PKG" ]; then - export RIAK_MESOS_RIAK_PKG=riak.tar.gz - fi - if [ -z "$RIAK_MESOS_PATCHES_PKG" ]; then - export RIAK_MESOS_PATCHES_PKG=riak_erlpmd_patches.tar.gz - fi - if [ -z "$RIAK_MESOS_NAME" ]; then export RIAK_MESOS_NAME=riak fi - - mkdir -p artifacts - mv $RIAK_MESOS_RIAK_PKG artifacts/$RIAK_MESOS_RIAK_PKG &> /dev/null - mv $RIAK_MESOS_EXPLORER_PKG artifacts/$RIAK_MESOS_EXPLORER_PKG &> /dev/null - mv $RIAK_MESOS_EXECUTOR_PKG artifacts/$RIAK_MESOS_EXECUTOR_PKG &> /dev/null - mv $RIAK_MESOS_PATCHES_PKG artifacts/$RIAK_MESOS_PATCHES_PKG &> /dev/null - rm $RIAK_MESOS_SCHEDULER_PKG - rm -rf root - rm -rf riak_mesos_executor - RUN_SCHEDULER="riak_mesos_scheduler/bin/riak_mesos_scheduler -noinput" - if [ -n "$RIAK_MESOS_ATTACH" ]; then RUN_SCHEDULER="$RUN_SCHEDULER -sname '$RIAK_MESOS_NAME' -setcookie '$RIAK_MESOS_NAME'" fi - echo "Starting riak_mesos_scheduler..." eval "$RUN_SCHEDULER" } diff --git a/src/rms_cluster.erl b/src/rms_cluster.erl index 70532a8..f066bbd 100644 --- a/src/rms_cluster.erl +++ b/src/rms_cluster.erl @@ -23,7 +23,7 @@ -behaviour(gen_fsm). %% API --export([start_link/1]). +-export([start_link/2]). -export([get/1, get/2, get_field_value/2, @@ -46,20 +46,19 @@ terminate/3, code_change/4]). --export([ - requested/2, +-export([requested/2, requested/3, running/2, running/3, restarting/2, restarting/3, shutdown/2, - shutdown/3 - ]). + shutdown/3]). -record(cluster, {key :: rms_cluster:key(), - riak_config = <<>> :: binary(), - advanced_config = <<>> :: binary(), + riak_version = "" :: string(), + riak_config = undefined :: undefined | string(), + advanced_config = undefined :: undefined | string(), generation = 1 :: pos_integer(), to_restart = {[], []} :: {list(rms_node:key()), list(rms_node:key())}}). @@ -78,10 +77,9 @@ %%% API --spec start_link(key()) -> - {ok, pid()} | {error, term()}. -start_link(Key) -> - gen_fsm:start_link(?MODULE, Key, []). +-spec start_link(key(), string()) -> {ok, pid()} | {error, term()}. +start_link(Key, RiakVersion) -> + gen_fsm:start_link(?MODULE, {Key, RiakVersion}, []). -spec get(key()) -> {ok, rms_metadata:cluster_state()} | {error, term()}. get(Key) -> @@ -103,8 +101,8 @@ get_field_value(Field, Key) -> case rms_metadata:get_cluster(Key) of {ok, Cluster} -> case proplists:get_value(Field, Cluster, field_not_found) of - field_not_found -> - {error, field_not_found}; + field_not_found = Reason -> + {error, Reason}; Value -> {ok, Value} end; @@ -169,15 +167,14 @@ node_stopped(Pid, NodeKey) -> | {reply, reply(), Next::state(), New::cluster_state()} | {reply, reply(), Next::state(), New::cluster_state(), state_timeout()}. --spec init(key()) -> - {ok, state(), cluster_state()} - | {stop, reason()}. -init(Key) -> +-spec init({key(), string()}) -> + {ok, state(), cluster_state()} | {stop, reason()}. +init({Key, RiakVersion}) -> case get_cluster(Key) of {ok, {State, Cluster}} -> {ok, State, Cluster}; {error, not_found} -> - Cluster = #cluster{key = Key}, + Cluster = #cluster{key = Key, riak_version = RiakVersion}, case add_cluster({running, Cluster}) of ok -> {ok, running, Cluster}; @@ -507,6 +504,7 @@ next_timeout(shutdown, #cluster{}=_Cluster) -> from_list(ClusterList) -> {proplists:get_value(status, ClusterList), #cluster{key = proplists:get_value(key, ClusterList), + riak_version = proplists:get_value(riak_version, ClusterList), riak_config = proplists:get_value(riak_config, ClusterList), advanced_config = proplists:get_value(advanced_config, ClusterList), @@ -516,12 +514,14 @@ from_list(ClusterList) -> -spec to_list({atom(), cluster_state()}) -> rms_metadata:cluster_state(). to_list({State, #cluster{key = Key, + riak_version = RiakVersion, riak_config = RiakConf, advanced_config = AdvancedConfig, to_restart = ToRestart, generation = Generation}}) -> [{key, Key}, {status, State}, + {riak_version, RiakVersion}, {riak_config, RiakConf}, {advanced_config, AdvancedConfig}, {to_restart, ToRestart}, diff --git a/src/rms_cluster_manager.erl b/src/rms_cluster_manager.erl index 9364fe8..b8ab89e 100644 --- a/src/rms_cluster_manager.erl +++ b/src/rms_cluster_manager.erl @@ -27,10 +27,11 @@ -export([get_cluster_keys/0, get_cluster/1, get_cluster/2, + get_cluster_riak_version/1, get_cluster_riak_config/1, get_cluster_advanced_config/1, - add_cluster/1, add_cluster/2, + add_cluster_with_nodes/2, set_cluster_riak_config/2, set_cluster_advanced_config/2, restart_cluster/1, @@ -78,6 +79,11 @@ get_cluster(Key) -> get_cluster(Key, Fields) -> rms_cluster:get(Key, Fields). +-spec get_cluster_riak_version(rms_cluster:key()) -> + {ok, string()} | {error, term()}. +get_cluster_riak_version(Key) -> + rms_cluster:get_field_value(riak_version, Key). + -spec get_cluster_riak_config(rms_cluster:key()) -> {ok, binary()} | {error, term()}. get_cluster_riak_config(Key) -> @@ -88,13 +94,13 @@ get_cluster_riak_config(Key) -> get_cluster_advanced_config(Key) -> rms_cluster:get_field_value(advanced_config, Key). --spec add_cluster(rms_cluster:key()) -> ok | {error, term()}. -add_cluster(Key) -> +-spec add_cluster(rms_cluster:key(), string()) -> ok | {error, term()}. +add_cluster(Key, RiakVersion) -> case get_cluster(Key) of {ok, _Cluster} -> {error, exists}; {error, not_found} -> - ClusterSpec = cluster_spec(Key), + ClusterSpec = cluster_spec(Key, RiakVersion), case supervisor:start_child(?MODULE, ClusterSpec) of {ok, _Pid} -> ok; @@ -110,19 +116,20 @@ add_cluster(Key) -> end end. --spec add_cluster(rms_metadata:cluster_state(), [rms_metadata:node_state()]) -> +-spec add_cluster_with_nodes(rms_metadata:cluster_state(), + [rms_metadata:node_state()]) -> ok | {error, term()}. -add_cluster(Cluster, Nodes) -> +add_cluster_with_nodes(Cluster, Nodes) -> Key = proplists:get_value(key, Cluster), - case rms_cluster_manager:add_cluster(Key) of + RiakVersion = proplists:get_value(riak_version, Cluster), + case add_cluster(Key, RiakVersion) of ok -> {ok, Pid} = get_cluster_pid(Key), RiakConfig = proplists:get_value(riak_config, Cluster), AdvancedConfig = proplists:get_value(advanced_config, Cluster), Generation = proplists:get_value(generation, Cluster), - ok = rms_cluster_manager:set_cluster_riak_config(Key, RiakConfig), - ok = rms_cluster_manager:set_cluster_advanced_config(Key, - AdvancedConfig), + ok = set_cluster_riak_config(Key, RiakConfig), + ok = set_cluster_advanced_config(Key, AdvancedConfig), ok = rms_cluster:set_generation(Pid, Generation), [begin NodeKey = proplists:get_value(key, Node), @@ -215,7 +222,7 @@ executors_to_shutdown([NodeKey|Rest], Accum) -> {ok, true} -> {ok, AgentIdValue} = rms_node_manager:get_node_agent_id_value(NodeKey), {ok, ExecutorIdValue} = rms_node_manager:get_node_executor_id_value(NodeKey), - executors_to_shutdown(Rest, [{ExecutorIdValue, AgentIdValue}|Accum]); + executors_to_shutdown(Rest, [{ExecutorIdValue, AgentIdValue} | Accum]); {ok, false} -> executors_to_shutdown(Rest, Accum) end. @@ -285,8 +292,8 @@ apply_offer(OfferHelper) -> -spec init({}) -> {ok, {{supervisor:strategy(), 1, 1}, [supervisor:child_spec()]}}. init({}) -> - Specs = [cluster_spec(Key) || - {Key, _Cluster} <- rms_metadata:get_clusters()], + Specs = [cluster_spec(Key, proplists:get_value(riak_version, Cluster)) || + {Key, Cluster} <- rms_metadata:get_clusters()], {ok, {{one_for_one, 1, 1}, Specs}}. %% Internal functions. @@ -423,10 +430,10 @@ unreserve_volumes(OfferHelper) -> OfferHelper end. --spec cluster_spec(rms_cluster:key()) -> supervisor:child_spec(). -cluster_spec(Key) -> +-spec cluster_spec(rms_cluster:key(), string()) -> supervisor:child_spec(). +cluster_spec(Key, RiakVersion) -> {Key, - {rms_cluster, start_link, [Key]}, + {rms_cluster, start_link, [Key, RiakVersion]}, transient, 5000, worker, [rms_cluster]}. -spec get_cluster_pid(rms_cluster:key()) -> {ok, pid()} | {error, not_found}. diff --git a/src/rms_config.erl b/src/rms_config.erl index 63eb2e9..1165e04 100644 --- a/src/rms_config.erl +++ b/src/rms_config.erl @@ -20,70 +20,43 @@ -module(rms_config). --export([master_hosts/0, +-export([root/0, static_root/0, - constraints/0, + master_hosts/0, zk/0, + constraints/0, framework_name/0, framework_role/0, - webui_url/0, - artifacts/0, - artifact_urls/0, + webui_url/0, + resource_urls/0, persistent_path/0, - riak_root_path/0, framework_hostname/0]). -export([get_value/2, get_value/3]). --define(DEFAULT_NAME, "riak"). --define(DEFAULT_HOSTNAME_SUFFIX, ".marathon.mesos"). +-define(ROOT, "../"). +-define(STATIC_ROOT, "../artifacts/"). -define(DEFAULT_MASTER, "master.mesos:5050"). -define(DEFAULT_ZK, "master.mesos:2181"). +-define(DEFAULT_NAME, "riak"). +-define(DEFAULT_HOSTNAME_SUFFIX, ".marathon.mesos"). -define(DEFAULT_CONSTRAINTS, "[]"). --define(STATIC_ROOT, "../artifacts/"). --define(DEFAULT_RIAK_ROOT_PATH, "root"). - - % The path-tail in a Riak archive, for which we search --define(RIAK_BIN, "riak/bin/riak"). %% Helper functions. +-spec root() -> string(). +root() -> + ?ROOT. + +-spec static_root() -> string(). +static_root() -> + ?STATIC_ROOT. + -spec master_hosts() -> [string()]. master_hosts() -> {Hosts, _} = split_hosts(get_value(master, ?DEFAULT_MASTER, string)), Hosts. --spec static_root() -> string(). -static_root() -> ?STATIC_ROOT. - --spec constraints() -> rms_offer_helper:constraints(). -constraints() -> - ConstraintsRaw = get_value(constraints, ?DEFAULT_CONSTRAINTS), - %% constraints might be double-string-encoded - ConstraintsStr = - case {re:run(ConstraintsRaw, "\\\\"), re:run(ConstraintsRaw, """)} of - {nomatch, nomatch} -> % plain JSON-as-a-string "[[ \"hostname\", \"UNIQUE\" ]]" - convert_value(ConstraintsRaw, string); - {nomatch, {match, _}} -> % plain JSON as an html-encoded string e.g. "[["hostname", "UNIQUE"]]" - convert_value(ConstraintsRaw, html_string); - {{match, _}, nomatch} -> % double-encoded string e.g. "\"[[\\\"hostname\\\", \\\"UNIQUE\\\"]]\"" - mochijson2:decode(convert_value(ConstraintsRaw, string)) - end, - ConstraintsBin = case mochijson2:decode(ConstraintsStr) of - [] -> []; - [[]] -> []; - [[F|_]|_]=C when is_binary(F) -> C; - [F|_]=C when is_binary(F) -> [C]; - _ -> [] - end, - lists:foldr( - fun(X1, Accum1) -> - [lists:foldr( - fun(X2, Accum2) -> - [binary_to_list(X2)|Accum2] - end, [], X1)|Accum1] - end, [], ConstraintsBin). - -spec zk() -> string(). zk() -> split_hosts(get_value(zk, ?DEFAULT_ZK, string)). @@ -111,62 +84,46 @@ webui_url() -> Port = rms_config:get_value(port, 9090, integer), "http://" ++ Hostname ++ ":" ++ integer_to_list(Port) ++ "/". --spec artifacts() -> [string()]. -artifacts() -> - [ - get_value(riak_pkg, "riak.tar.gz", string), - get_value(explorer_pkg, "riak_explorer.tar.gz", string), - get_value(patches_pkg, "riak_erlpmd_patches.tar.gz", string), - get_value(executor_pkg, "riak_mesos_executor.tar.gz", string) - ]. - --spec riak_root_path() -> string(). -riak_root_path() -> - RiakPkg = get_value(riak_pkg, "riak.tar.gz", string), - ArtifactDir = "../artifacts", - Filename = filename:join([ArtifactDir, RiakPkg]), - {ok, TarTable} = erl_tar:table(Filename, [compressed]), - find_root_path(TarTable). - -%% TODO Should we log something in this case? --spec find_root_path(list(string())) -> string(). -find_root_path([]) -> ?DEFAULT_RIAK_ROOT_PATH; -find_root_path([P | Paths]) -> - case lists:suffix(?RIAK_BIN, P) of - true -> - %% Strip the known tail, leave only the prefix - find_prefix(P, ?RIAK_BIN); - false -> find_root_path(Paths) - end. - --spec find_prefix(string(), string()) -> string(). -find_prefix(FullPath, Tail) -> - % We know that FullPath = Prefix ++ Tail - % How to find Prefix? - SplitPath = filename:split(FullPath), - SplitTail = filename:split(Tail), - % Reverse the path components - LiatTilps = lists:reverse(SplitTail), - HtapTilps = lists:reverse(SplitPath), - % Find the common path-tail (list-head), reverse and join - filename:join(lists:reverse(drop_common_prefix(HtapTilps, LiatTilps))). - -% Drops from A the leading elements common to A and B. --spec drop_common_prefix(A::list(), B::list()) -> list(). -drop_common_prefix([], _) -> []; -drop_common_prefix([X | Rest1], [X | Rest2]) -> drop_common_prefix(Rest1, Rest2); -drop_common_prefix(Rest, _) -> Rest. - --spec artifact_urls() -> [string()]. -artifact_urls() -> - %% TODO "static" is magic - Base = webui_url() ++ "static/", - [ Base ++ Artifact || Artifact <- artifacts() ]. +-spec resource_urls() -> [{string(), string()}]. +resource_urls() -> + Resources = mochijson2:decode(get_value(resource_urls, undefined, string), + [{format, proplist}]), + [{binary_to_list(Key), binary_to_list(Url)} || + {Key, Url} <- Resources]. -spec persistent_path() -> string(). persistent_path() -> get_value(persistent_path, "data", string). +-spec constraints() -> rms_offer_helper:constraints(). +constraints() -> + ConstraintsRaw = get_value(constraints, ?DEFAULT_CONSTRAINTS), + %% constraints might be double-string-encoded + ConstraintsStr = + case {re:run(ConstraintsRaw, "\\\\"), re:run(ConstraintsRaw, """)} of + {nomatch, nomatch} -> + %% plain JSON-as-a-string "[[ \"hostname\", \"UNIQUE\" ]]" + convert_value(ConstraintsRaw, string); + {nomatch, {match, _}} -> + %% plain JSON as an html-encoded string e.g. "[["hostname", "UNIQUE"]]" + convert_value(ConstraintsRaw, html_string); + {{match, _}, nomatch} -> + %% double-encoded string e.g. "\"[[\\\"hostname\\\", \\\"UNIQUE\\\"]]\"" + mochijson2:decode(convert_value(ConstraintsRaw, string)) + end, + ConstraintsBin = case mochijson2:decode(ConstraintsStr) of + [] -> []; + [[]] -> []; + [[F | _] | _] = C when is_binary(F) -> C; + [F | _] = C when is_binary(F) -> [C]; + _ -> [] + end, + lists:foldr(fun(X1, Accum1) -> + [lists:foldr(fun(X2, Accum2) -> + [binary_to_list(X2)|Accum2] + end, [], X1)|Accum1] + end, [], ConstraintsBin). + %% External functions. -spec get_value(atom(), term()) -> term(). @@ -219,7 +176,7 @@ unescape_html("""++Rest) -> "\"" ++ unescape_html(Rest); unescape_html("<" ++ Rest) -> "<" ++ unescape_html(Rest); unescape_html(">" ++ Rest) -> ">" ++ unescape_html(Rest); unescape_html("&"++ Rest) -> "&" ++ unescape_html(Rest); -unescape_html([C | Rest]) -> [ C | unescape_html(Rest) ]. +unescape_html([C | Rest]) -> [C | unescape_html(Rest)]. -spec get_env_value(atom()) -> string() | false. get_env_value(Key) -> diff --git a/src/rms_node_manager.erl b/src/rms_node_manager.erl index 30fafd9..7e3cc31 100644 --- a/src/rms_node_manager.erl +++ b/src/rms_node_manager.erl @@ -145,26 +145,26 @@ get_node_hostname(Key) -> -spec get_node_hosts() -> {ok, rms_offer_helper:hostnames()}. get_node_hosts() -> {ok, lists:foldl(fun({_, Node}, Accum) -> - case {proplists:get_value(status, Node), - proplists:get_value(hostname, Node)} of - {shutdown, _} -> Accum; - {_,{error, _}} -> Accum; - {_,undefined} -> Accum; - {_,Host} -> [Host|Accum] - end - end, [], rms_metadata:get_nodes())}. + case {proplists:get_value(status, Node), + proplists:get_value(hostname, Node)} of + {shutdown, _} -> Accum; + {_, {error, _}} -> Accum; + {_, undefined} -> Accum; + {_, Host} -> [Host|Accum] + end + end, [], rms_metadata:get_nodes())}. -spec get_node_attributes() -> {ok, [rms_offer_helper:attributes_group()]}. get_node_attributes() -> {ok, lists:foldl(fun({_, Node}, Accum) -> - case {proplists:get_value(status, Node), + case {proplists:get_value(status, Node), proplists:get_value(attributes, Node)} of - {shutdown, _} -> Accum; - {_,{error, _}} -> Accum; - {_,undefined} -> Accum; - {_,Attributes} -> [Attributes|Accum] - end - end, [], rms_metadata:get_nodes())}. + {shutdown, _} -> Accum; + {_, {error, _}} -> Accum; + {_, undefined} -> Accum; + {_, Attributes} -> [Attributes | Accum] + end + end, [], rms_metadata:get_nodes())}. -spec get_node_http_port(rms_node:key()) -> {ok, pos_integer()} | {error, term()}. get_node_http_port(Key) -> @@ -176,25 +176,25 @@ get_node_name(Key) -> -spec get_node_http_url(rms_node:key()) -> {ok, string()} | {error, term()}. get_node_http_url(Key) -> - case {get_node_hostname(Key), - get_node_http_port(Key)} of - {{ok, H},{ok, P}} when is_list(H) and is_integer(P) -> + case {get_node_hostname(Key), get_node_http_port(Key)} of + {{ok, H}, {ok, P}} when is_list(H) and is_integer(P) -> {ok, H ++ ":" ++ integer_to_list(P)}; - _ -> {error, not_found} + _ -> + {error, not_found} end. -spec get_node_agent_id_value(rms_node:key()) -> - {ok, string()} | {error, term()}. + {ok, string()} | {error, term()}. get_node_agent_id_value(Key) -> rms_node:get_field_value(agent_id_value, Key). -spec get_node_executor_id_value(rms_node:key()) -> - {ok, string()} | {error, term()}. + {ok, string()} | {error, term()}. get_node_executor_id_value(Key) -> rms_node:get_field_value(executor_id_value, Key). -spec get_node_persistence_id(rms_node:key()) -> - {ok, string()} | {error, term()}. + {ok, string()} | {error, term()}. get_node_persistence_id(Key) -> rms_node:get_field_value(persistence_id, Key). @@ -329,8 +329,7 @@ apply_unreserved_offer(NodeKey, OfferHelper) -> end. -spec apply_reserved_offer(rms_node:key(), rms_offer_helper:offer_helper()) -> - {ok, rms_offer_helper:offer_helper()} | - {error, not_enough_resources | term()}. + {ok, rms_offer_helper:offer_helper()} | {error, term()}. apply_reserved_offer(NodeKey, OfferHelper) -> case get_node_pid(NodeKey) of {ok, _Pid} -> @@ -343,8 +342,9 @@ apply_reserved_offer(NodeKey, OfferHelper) -> {ok, NodeMem} = rms_metadata:get_option(node_mem), {ok, NodeDisk} = rms_metadata:get_option(node_disk), {ok, ArtifactUrls} = rms_metadata:get_option(artifact_urls), + {ok, ArtifactDir} = rms_metadata:get_option(artifact_dir), {ok, PersistentPath} = rms_metadata:get_option(persistent_path), - {ok, RiakRootPath} = rms_metadata:get_option(riak_root_path), + NodeNumPorts = ?NODE_NUM_PORTS, UnfitForReserved = rms_offer_helper:unfit_for_reserved( @@ -363,6 +363,10 @@ apply_reserved_offer(NodeKey, OfferHelper) -> {ok, NodeHostname} = get_node_hostname(NodeKey), {ok, AgentIdValue} = get_node_agent_id_value(NodeKey), + {ok, RiakVersion} = rms_cluster_manager:get_cluster_riak_version(ClusterKey), + RiakUrlStr = proplists:get_value(RiakVersion, ArtifactUrls), + RiakRootPath = rms_resources:riak_root_path(ArtifactDir, RiakUrlStr), + %% Apply reserved resources for task. OfferHelper0 = rms_offer_helper:clean_applied_resources(OfferHelper), @@ -399,8 +403,9 @@ apply_reserved_offer(NodeKey, OfferHelper) -> AgentId = erl_mesos_utils:agent_id(AgentIdValue), - [RiakUrlStr, RiakExplorerUrlStr, RiakPatchesStr, ExecutorUrlStr] = - ArtifactUrls, + ExecutorUrlStr = proplists:get_value("executor", ArtifactUrls), + RiakPatchesStr = proplists:get_value("patches", ArtifactUrls), + RiakExplorerUrlStr = proplists:get_value("explorer", ArtifactUrls), ExecutorUrl = erl_mesos_utils:command_info_uri(ExecutorUrlStr, false, true), RiakExplorerUrl = erl_mesos_utils:command_info_uri(RiakExplorerUrlStr, false, true), @@ -414,10 +419,11 @@ apply_reserved_offer(NodeKey, OfferHelper) -> {ok, RiakIface} = rms_metadata:get_option(node_iface), CommandInfo = case RiakIface of - "" -> CommandInfo0; + "" -> + CommandInfo0; Iface -> - NodeIfaceEnv = rms_erl_mesos_utils:environment_variable( - "RIAK_MESOS_NODE_IFACE", Iface), + NodeIfaceEnv = + rms_erl_mesos_utils:environment_variable("RIAK_MESOS_NODE_IFACE", Iface), CmdEnv = rms_erl_mesos_utils:environment([NodeIfaceEnv]), rms_erl_mesos_utils:set_command_info_environment(CommandInfo0, CmdEnv) end, @@ -449,7 +455,7 @@ apply_reserved_offer(NodeKey, OfferHelper) -> % Tack a new UUID onto ExecutorId - this way we don't clash with previous instances of this same node % in places like mesos web-ui - % NB This is used as the executor's nodename so must be a valid erlang nodename + % NB This is used as the executor's node name so must be a valid erlang node name ExecutorIdValue = NodeKey ++ "-" ++ uuid:to_string(uuid:uuid4()), ExecutorId = erl_mesos_utils:executor_id(ExecutorIdValue), diff --git a/src/rms_resources.erl b/src/rms_resources.erl new file mode 100644 index 0000000..63c3d86 --- /dev/null +++ b/src/rms_resources.erl @@ -0,0 +1,159 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Basho Technologies Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(rms_resources). + +-export([init_artifacts/4, + artifact_urls/2, + riak_urls/1, + riak_root_path/2]). + +-define(EXCLUDE_ARTIFACT_KEYS, ["scheduler"]). + +-define(DEFAULT_RIAK_ROOT_PATH, "root"). + +-define(RIAK_BIN, "riak/bin/riak"). + +%% External functions. + +init_artifacts(RootDir, ArtifactDir, ResourceUrls, PersistentPath) -> + ok = filelib:ensure_dir(ArtifactDir), + Artifacts = artifacts(ResourceUrls), + ok = move_artifacts(RootDir, ArtifactDir, Artifacts), + case is_persistent_volume_safe(PersistentPath, ArtifactDir, Artifacts) of + {error, {path_clash, Package}} -> + lager:error("Unable to start scheduler: a path in ~p will overwrite" + " the persistent volume path (~p) in executors. " + "Refusing to start.", [Package, PersistentPath]), + {error, path_clash}; + {error, Reason} = Error -> + lager:error("Unable to validate archives against persistent volume " + "path, because: ~p", [Reason]), + Error; + ok -> + ok + end. + +-spec artifact_urls(string(), string()) -> [{string(), string()}]. +artifact_urls(BaseUrl, ResourceUrls) -> + Artifacts = artifacts(ResourceUrls), + [{Key, BaseUrl ++ Package} || {Key, Package} <- Artifacts]. + +-spec riak_urls([{string(), string()}]) -> [{string(), string()}]. +riak_urls(ResourceUrls) -> + [ResourceUrl || {Key, _Url} = ResourceUrl <- ResourceUrls, + case Key of + "riak-" ++ _Version -> + true; + _Key -> + false + end]. + +-spec riak_root_path(string(), string()) -> string(). +riak_root_path(ArtifactDir, RiakUrl) -> + Package = get_package_from_url(RiakUrl), + Filename = filename:join([ArtifactDir, Package]), + {ok, Tar} = erl_tar:table(Filename, [compressed]), + find_riak_root_path(Tar). + +%% Internal functions. + +-spec artifacts([{string(), string()}]) -> [{string(), string()}]. +artifacts(ResourceUrls) -> + [{Key, get_package_from_url(Url)} || {Key, Url} <- ResourceUrls]. + +-spec get_package_from_url(string()) -> string(). +get_package_from_url(Url) -> + lists:last(string:tokens(Url, "/")). + +-spec move_artifacts(string(), string(), [{string(), string()}]) -> + ok | {error, term()}. +move_artifacts(RootDir, ArtifactDir, [{Key, Package} | Artifacts]) -> + PackagePath = filename:join([RootDir, Package]), + case lists:member(Key, ?EXCLUDE_ARTIFACT_KEYS) of + true -> + file:delete(PackagePath), + move_artifacts(RootDir, ArtifactDir, Artifacts); + false -> + ArtifactPath = filename:join([ArtifactDir, Package]), + file:rename(PackagePath, ArtifactPath), + move_artifacts(RootDir, ArtifactDir, Artifacts) + end; +move_artifacts(_RootDir, _ArtifactDir, []) -> + ok. + +-spec is_persistent_volume_safe(string(), string(), [{string(), string()}]) -> + ok | {error, term()}. +is_persistent_volume_safe(Path, ArtifactDir, [{Key, Package} | Artifacts]) -> + case lists:member(Key, ?EXCLUDE_ARTIFACT_KEYS) of + true -> + is_persistent_volume_safe(Path, ArtifactDir, Artifacts); + false -> + ArtifactPath = filename:join([ArtifactDir, Package]), + case erl_tar:table(ArtifactPath, [compressed]) of + {ok, Content} -> + case lists:member(Path, Content) orelse + lists:member(Path ++ "/", Content) of + true -> + {error, {path_clash, Package}}; + false -> + is_persistent_volume_safe(Path, ArtifactDir, + Artifacts) + end; + {error, _Reason} = Error -> + Error + end + end; +is_persistent_volume_safe(_Path, _ArtifactDir, []) -> + ok. + +-spec find_riak_root_path([string()]) -> string(). +find_riak_root_path([]) -> + ?DEFAULT_RIAK_ROOT_PATH; +find_riak_root_path([Path | Paths]) -> + case lists:suffix(?RIAK_BIN, Path) of + true -> + %% Strip the known tail, leave only the prefix. + find_prefix(Path, ?RIAK_BIN); + false -> + find_riak_root_path(Paths) + end. + +-spec find_prefix(string(), string()) -> string(). +find_prefix(FullPath, Tail) -> + % We know that FullPath = Prefix ++ Tail + % How to find Prefix? + SplitPath = filename:split(FullPath), + SplitTail = filename:split(Tail), + % Reverse the path components + ReverseSplitTail = lists:reverse(SplitTail), + ReverseSplitPath = lists:reverse(SplitPath), + % Find the common path-tail (list-head), reverse and join + filename:join(lists:reverse(drop_common_prefix(ReverseSplitPath, + ReverseSplitTail))). + +% Drops from A the leading elements common to A and B. +-spec drop_common_prefix([string()], [string()]) -> [string()]. +drop_common_prefix([Component | Path1], [Component | Path2]) -> + drop_common_prefix(Path1, Path2); +drop_common_prefix([], _) -> + []; +drop_common_prefix(Path1, _) -> + Path1. diff --git a/src/rms_sup.erl b/src/rms_sup.erl index 33119a6..4360de8 100644 --- a/src/rms_sup.erl +++ b/src/rms_sup.erl @@ -37,51 +37,19 @@ start_link() -> -spec init([]) -> {ok, {{supervisor:strategy(), 1, 1}, [supervisor:child_spec()]}}. init([]) -> - PersistentVolPath = rms_config:persistent_path(), - ArtifactsDir = rms_config:static_root(), - Artifacts = rms_config:artifacts(), - %% For each artifact, make sure it does not contain PersistentVolPath: - %% if it does, DO NOT START, print something very, very clear - case persistent_vol_failsafe(PersistentVolPath, ArtifactsDir, Artifacts) of - {error, {path_clash, Artifact}} -> - lager:error("Unable to start scheduler: a path in ~p will overwrite" - " the persistent volume path (~p) in executors. Refusing to start.", - %% TODO Maybe add a link to some info in the logline? Is that naive? - [Artifact, PersistentVolPath]), - {error, path_clash}; - {error, _} = Error -> - lager:error("Unable to validate archives against persistent volume path, because: ~p", - [Error]), - Error; - ok -> init_rest() - end. - -persistent_vol_failsafe(_, _, []) -> ok; -persistent_vol_failsafe(PVPath, ArtsDir, [Art | Artifacts]) -> - Artifact = filename:join(ArtsDir, Art), - case erl_tar:table(Artifact, [compressed]) of - {error, _}=Error -> Error ; - {ok, Contents} -> - case lists:member(PVPath, Contents) orelse - lists:member(PVPath++"/", Contents) of - true -> {error, {path_clash, Art}}; - false -> - persistent_vol_failsafe(PVPath, ArtsDir, Artifacts) - end - end. - --spec init_rest() -> - {ok, {{supervisor:strategy(), 1, 1}, [supervisor:child_spec()]}}. -init_rest() -> + RootDir = rms_config:root(), + ArtifactDir = rms_config:static_root(), + ResourceUrls = rms_config:resource_urls(), + PersistentPath = rms_config:persistent_path(), + ok = rms_resources:init_artifacts(RootDir, ArtifactDir, ResourceUrls, + PersistentPath), Ip = rms_config:get_value(ip, "0.0.0.0"), Port = rms_config:get_value(port, 9090, integer), - WebConfig = - [{ip, Ip}, - {port, Port}, - {nodelay, true}, - {log_dir, "log"}, - {dispatch, rms_wm_resource:dispatch()}], - + WebConfig = [{ip, Ip}, + {port, Port}, + {nodelay, true}, + {log_dir, "log"}, + {dispatch, rms_wm_resource:dispatch()}], ZooKeeper = rms_config:zk(), %% TODO: may be use path {ZkNodes, _ZkPath} = ZooKeeper, @@ -111,9 +79,11 @@ init_rest() -> ExecutorCpus = rms_config:get_value(executor_cpus, 0.1, number), ExecutorMem = rms_config:get_value(executor_mem, 512.0, number), - PersistentVolPath = rms_config:persistent_path(), - ArtifactUrls = rms_config:artifact_urls(), - RiakRootPath = rms_config:riak_root_path(), + %% TODO "static" is magic + BaseUrl = FrameworkWebUIURL ++ "static/", + ArtifactUrls = rms_resources:artifact_urls(BaseUrl, ResourceUrls), + %%RiakRootPath = rms_config:riak_root_path(), + Ref = riak_mesos_scheduler, Scheduler = rms_scheduler, @@ -129,11 +99,11 @@ init_rest() -> {node_mem, NodeMem}, {node_disk, NodeDisk}, {node_iface, NodeIface}, - {persistent_path, PersistentVolPath}, + {persistent_path, PersistentPath}, {executor_cpus, ExecutorCpus}, {executor_mem, ExecutorMem}, {artifact_urls, ArtifactUrls}, - {riak_root_path, RiakRootPath}], + {artifact_dir, ArtifactDir}], MasterHosts = rms_config:master_hosts(), ResubscribeInterval = rms_config:get_value(master_election_timeout, 60000, diff --git a/src/rms_wm_helper.erl b/src/rms_wm_helper.erl index 87ee234..651e69a 100644 --- a/src/rms_wm_helper.erl +++ b/src/rms_wm_helper.erl @@ -20,7 +20,10 @@ -module(rms_wm_helper). --export([cluster_exists/1, +-export([riak_urls/0, + cluster_exists/1, + cluster_riak_config_exists/1, + cluster_advanced_config_exists/1, get_cluster_with_nodes_list/1, get_cluster_with_nodes_list/3, get_clusters_list_with_nodes_list/0, @@ -33,11 +36,12 @@ get_nodes_list/1, get_nodes_list/2, add_clusters_list_with_nodes_list/1, - add_cluster_with_nodes_list/1]). + add_cluster_with_nodes_list/1, + add_cluster/1]). -export([to_json/1, to_json/2, from_json/1, from_json/2]). --define(CLUSTER_FIELDS, [key, riak_config, advanced_config, generation]). +-define(CLUSTER_FIELDS, [key, riak_version, riak_config, advanced_config, generation]). -define(CLUSTER_NODE_FIELDS, [key, status, container_path, persistence_id]). @@ -52,11 +56,38 @@ %% External functions. +-spec riak_urls() -> [{string(), string()}]. +riak_urls() -> + ResourceUrls = rms_config:resource_urls(), + rms_resources:riak_urls(ResourceUrls). + -spec cluster_exists(rms_cluster:key()) -> boolean(). cluster_exists(ClusterKey) -> {ok, [{key, ClusterKey}]} == rms_cluster_manager:get_cluster(ClusterKey, [key]). +-spec cluster_riak_config_exists(rms_cluster:key()) -> boolean(). +cluster_riak_config_exists(ClusterKey) -> + case rms_cluster_manager:get_cluster_riak_config(ClusterKey) of + {ok, undefined} -> + false; + {ok, _RiakConfig} -> + true; + {error, not_found} -> + false + end. + +-spec cluster_advanced_config_exists(rms_cluster:key()) -> boolean(). +cluster_advanced_config_exists(ClusterKey) -> + case rms_cluster_manager:get_cluster_advanced_config(ClusterKey) of + {ok, undefined} -> + false; + {ok, _RiakConfig} -> + true; + {error, not_found} -> + false + end. + -spec get_cluster_with_nodes_list(rms_cluster:key()) -> {ok, [rms_metadata:cluster_state() | {nodes, {list, [rms_metadata:node_state()]}}]} | {error, term()}. @@ -132,8 +163,24 @@ add_clusters_list_with_nodes_list({list, Clusters}) -> add_clusters_list_with_nodes_list(Clusters, []). add_cluster_with_nodes_list(Cluster) -> - {list, Nodes} = proplists:get_value(nodes, Cluster), - rms_cluster_manager:add_cluster(Cluster, Nodes). + RiakVersion = proplists:get_value(riak_version, Cluster), + case validate_riak_version(RiakVersion) of + ok -> + {list, Nodes} = proplists:get_value(nodes, Cluster), + rms_cluster_manager:add_cluster_with_nodes(Cluster, Nodes); + {error, _Reason} = Reason -> + Reason + end. + +add_cluster(Cluster) -> + Key = proplists:get_value(key, Cluster), + RiakVersion = proplists:get_value(riak_version, Cluster), + case validate_riak_version(RiakVersion) of + ok -> + rms_cluster_manager:add_cluster(Key, RiakVersion); + {error, _Reason} = Reason -> + Reason + end. to_json(Value) -> to_json(Value, []). @@ -165,6 +212,15 @@ from_json(Value, _Options) -> %% Internal functions. +-spec validate_riak_version(string()) -> boolean(). +validate_riak_version(RiakVersion) -> + case lists:keymember(RiakVersion, 1, riak_urls()) of + true -> + ok; + false -> + {error, invalid_riak_version} + end. + -spec get_clusters_list([rms_cluster:key()], [atom()], [atom()], [rms_metadata:cluster_state() | {nodes, {list, [rms_metadata:node_state()]}}]) -> @@ -200,17 +256,24 @@ get_nodes_list([], _NodeFields, Nodes) -> {list, lists:reverse(Nodes)}. add_clusters_list_with_nodes_list([Cluster | Clusters], Results) -> - {list, Nodes} = proplists:get_value(nodes, Cluster), Key = proplists:get_value(key, Cluster), - Result = case rms_cluster_manager:add_cluster(Cluster, Nodes) of + RiakVersion = proplists:get_value(riak_version, Cluster), + Result = case validate_riak_version(RiakVersion) of ok -> - [{key, Key}, {success, true}]; - {error, Reason} -> - [{key, Key}, - {success, false}, - {reason, io_lib:format("~p", [Reason])}] + {list, Nodes} = proplists:get_value(nodes, Cluster), + rms_cluster_manager:add_cluster_with_nodes(Cluster, Nodes); + {error, _Reason} = Error -> + Error end, - add_clusters_list_with_nodes_list(Clusters, [Result | Results]); + Result1 = case Result of + ok -> + [{key, Key}, {success, true}]; + {error, Reason} -> + [{key, Key}, + {success, false}, + {reason, io_lib:format("~p", [Reason])}] + end, + add_clusters_list_with_nodes_list(Clusters, [Result1 | Results]); add_clusters_list_with_nodes_list([], Results) -> {list, lists:reverse(Results)}. diff --git a/src/rms_wm_resource.erl b/src/rms_wm_resource.erl index fb545cd..16f698d 100644 --- a/src/rms_wm_resource.erl +++ b/src/rms_wm_resource.erl @@ -7,6 +7,8 @@ static_last_modified/1, static_file/1]). +-export([riak_versions/1]). + -export([clusters/1, set_clusters/1, cluster_exists/1, @@ -73,13 +75,13 @@ -define(CLUSTER_TO_JSON_OPTIONS, [{rename_keys, [{key, name}]}, - {replace_values, [{riak_config, <<>>, null}, - {advanced_config, <<>>, null}]}]). + {replace_values, [{riak_config, undefined, null}, + {advanced_config, undefined, null}]}]). -define(CLUSTER_FROM_JSON_OPTIONS, [{rename_keys, [{name, key}]}, - {replace_values, [{riak_config, null, <<>>}, - {advanced_config, null, <<>>}]}]). + {replace_values, [{riak_config, null, undefined}, + {advanced_config, null, undefined}]}]). -record(route, {base = ?API_ROUTE :: [string()], path :: [string() | atom()], @@ -113,6 +115,10 @@ routes() -> provides = {?MODULE, static_types}, content = {?MODULE, static_file}, last_modified = {?MODULE, static_last_modified}}, + %% Riak versions. + #route{path = ["riak", "versions"], + methods = ['GET'], + content = {?MODULE, riak_versions}}, %% Clusters. #route{path = ["clusters"], methods = ['GET', 'PUT'], @@ -228,6 +234,12 @@ static_file(ReqData) -> ReqData1 = wrq:set_resp_header("ETag", webmachine_util:quoted_string(ET), ReqData), {Response, ReqData1}. +%% Riak versions. + +riak_versions(ReqData) -> + JsonRiakUrls = rms_wm_helper:to_json(rms_wm_helper:riak_urls()), + {[{riak_versions, JsonRiakUrls}], ReqData}. + %% Clusters. clusters(ReqData) -> @@ -259,7 +271,9 @@ get_cluster(ReqData) -> add_cluster(ReqData) -> Key = wrq:path_info(key, ReqData), - Response = build_response(rms_cluster_manager:add_cluster(Key)), + Json = mochijson2:decode(wrq:req_body(ReqData)), + Cluster = [{key, Key} | rms_wm_helper:from_json(Json)], + Response = build_response(rms_wm_helper:add_cluster(Cluster)), {true, wrq:append_to_response_body(mochijson2:encode(Response), ReqData)}. set_cluster(ReqData) -> @@ -284,16 +298,7 @@ restart_cluster(ReqData) -> cluster_riak_config_exists(ReqData) -> Key = wrq:path_info(key, ReqData), - Result = case rms_cluster_manager:get_cluster(Key) of - {ok, _} -> - case rms_cluster_manager:get_cluster_riak_config(Key) of - {ok, <<>>} -> false; - {ok, _} -> true - end; - {error, not_found} -> - false - end, - {Result, ReqData}. + {rms_wm_helper:cluster_riak_config_exists(Key), ReqData}. get_cluster_riak_config(ReqData) -> Key = wrq:path_info(key, ReqData), @@ -318,21 +323,12 @@ set_cluster_riak_config(ReqData) -> delete_cluster_riak_config(ReqData) -> Key = wrq:path_info(key, ReqData), - Response = rms_cluster_manager:set_cluster_riak_config(Key, <<>>), + Response = rms_cluster_manager:set_cluster_riak_config(Key, undefined), {true, wrq:append_to_response_body(mochijson2:encode(Response), ReqData)}. cluster_advanced_config_exists(ReqData) -> Key = wrq:path_info(key, ReqData), - Result = case rms_cluster_manager:get_cluster(Key) of - {ok, _} -> - case rms_cluster_manager:get_cluster_advanced_config(Key) of - {ok, <<>>} -> false; - {ok, _} -> true - end; - {error, not_found} -> - false - end, - {Result, ReqData}. + {rms_wm_helper:cluster_advanced_config_exists(Key), ReqData}. get_cluster_advanced_config(ReqData) -> Key = wrq:path_info(key, ReqData), @@ -357,7 +353,7 @@ set_cluster_advanced_config(ReqData) -> delete_cluster_advanced_config(ReqData) -> Key = wrq:path_info(key, ReqData), - Response = rms_cluster_manager:set_cluster_advanced_config(Key, <<>>), + Response = rms_cluster_manager:set_cluster_advanced_config(Key, undefined), {true, wrq:append_to_response_body(mochijson2:encode(Response), ReqData)}. %% Nodes.