Skip to content

Commit

Permalink
Store and use executor_id when stopping nodes
Browse files Browse the repository at this point in the history
Now that we give the nodes a more unique executor_id we can't just guess
it from node-key and persistence_id: store it in the node's metadata in
ZK and use it accordingly
  • Loading branch information
sanmiguel committed Jun 16, 2016
1 parent 107e38f commit cac94b9
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
5 changes: 2 additions & 3 deletions src/rms_cluster_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,8 @@ executors_to_shutdown([NodeKey|Rest], Accum) ->
case rms_node_manager:node_can_be_shutdown(NodeKey) of
{ok, true} ->
{ok, AgentIdValue} = rms_node_manager:get_node_agent_id_value(NodeKey),
{ok, PersistenceId} = rms_node_manager:get_node_persistence_id(NodeKey),
ExecutorId = NodeKey ++ "-" ++ PersistenceId,
executors_to_shutdown(Rest, [{ExecutorId, AgentIdValue}|Accum]);
{ok, ExecutorIdValue} = rms_node_manager:get_node_executor_id_value(NodeKey),
executors_to_shutdown(Rest, [{ExecutorIdValue, AgentIdValue}|Accum]);
{ok, false} ->
executors_to_shutdown(Rest, Accum)
end.
Expand Down
12 changes: 10 additions & 2 deletions src/rms_node.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
can_be_shutdown/1,
set_reserve/5,
set_unreserve/1,
set_agent_info/8,
set_agent_info/9,
destroy/1,
destroy/2,
restart/1,
Expand Down Expand Up @@ -88,6 +88,7 @@
http_port :: pos_integer(),
pb_port :: pos_integer(),
disterl_port :: pos_integer(),
executor_id_value = "" :: string(),
agent_id_value = "" :: string(),
container_path = "" :: string(),
persistence_id = "" :: string(),
Expand Down Expand Up @@ -194,7 +195,7 @@ set_unreserve(Pid) ->
gen_fsm:sync_send_event(Pid, set_unreserve).

-spec set_agent_info(pid(), string(), string(), pos_integer(), pos_integer(),
pos_integer(), string(), string()) ->
pos_integer(), string(), string(), string()) ->
ok | {error, term()}.
set_agent_info(Pid,
NodeName,
Expand All @@ -203,6 +204,7 @@ set_agent_info(Pid,
PbPort,
DisterlPort,
AgentIdValue,
ExecutorIdValue,
ContainerPath) ->
gen_fsm:sync_send_event(
Pid, {set_agent_info,
Expand All @@ -212,6 +214,7 @@ set_agent_info(Pid,
PbPort,
DisterlPort,
AgentIdValue,
ExecutorIdValue,
ContainerPath}).

-spec destroy(pid()) -> ok | {error, term()}.
Expand Down Expand Up @@ -355,6 +358,7 @@ reserved({set_agent_info,
PbPort,
DisterlPort,
AgentIdValue,
ExecutorIdValue,
ContainerPath},
_From, Node) ->
Node1 = Node#node{hostname = Hostname,
Expand All @@ -363,6 +367,7 @@ reserved({set_agent_info,
pb_port = PbPort,
disterl_port = DisterlPort,
agent_id_value = AgentIdValue,
executor_id_value = ExecutorIdValue,
container_path = ContainerPath},
lager:info("Setting agent info for node to ~p", [Node1]),
update_and_reply({reserved, Node}, {reserved, Node1});
Expand Down Expand Up @@ -639,6 +644,7 @@ from_list(NodeList) ->
http_port = proplists:get_value(http_port, NodeList),
pb_port = proplists:get_value(pb_port, NodeList),
disterl_port = proplists:get_value(disterl_port, NodeList),
executor_id_value = proplists:get_value(executor_id_value, NodeList),
agent_id_value = proplists:get_value(agent_id_value, NodeList),
container_path = proplists:get_value(container_path, NodeList),
persistence_id = proplists:get_value(persistence_id, NodeList),
Expand All @@ -656,6 +662,7 @@ to_list(
pb_port = PbPort,
disterl_port = DisterlPort,
agent_id_value = AgentIdValue,
executor_id_value = ExecutorIdValue,
container_path = ContainerPath,
persistence_id = PersistenceId,
reconciled = Reconciled,
Expand All @@ -669,6 +676,7 @@ to_list(
{pb_port, PbPort},
{disterl_port, DisterlPort},
{agent_id_value, AgentIdValue},
{executor_id_value, ExecutorIdValue},
{container_path, ContainerPath},
{persistence_id, PersistenceId},
{reconciled, Reconciled},
Expand Down
10 changes: 9 additions & 1 deletion src/rms_node_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
get_node_http_url/1,
get_node_name/1,
get_node_agent_id_value/1,
get_node_executor_id_value/1,
get_node_persistence_id/1,
node_needs_to_be_reconciled/1,
node_can_be_scheduled/1,
Expand Down Expand Up @@ -164,6 +165,11 @@ get_node_http_url(Key) ->
get_node_agent_id_value(Key) ->
rms_node:get_field_value(agent_id_value, Key).

-spec get_node_executor_id_value(rms_node:key()) ->
{ok, string()} | {error, term()}.
get_node_executor_id_value(Key) ->
rms_node:get_field_value(executor_id_value, Key).

-spec get_node_persistence_id(rms_node:key()) ->
{ok, string()} | {error, term()}.
get_node_persistence_id(Key) ->
Expand Down Expand Up @@ -406,7 +412,8 @@ apply_reserved_offer(NodeKey, OfferHelper) ->
% Tack a new UUID onto ExecutorId - this way we don't clash with previous instances of this same node
% in places like mesos web-ui
% NB This is used as the executor's nodename so must be a valid erlang nodename
ExecutorId = erl_mesos_utils:executor_id(NodeKey ++ "-" ++ uuid:to_string(uuid:uuid4())),
ExecutorIdValue = NodeKey ++ "-" ++ uuid:to_string(uuid:uuid4()),
ExecutorId = erl_mesos_utils:executor_id(ExecutorIdValue),

Source = Name,
ExecutorInfo =
Expand All @@ -429,6 +436,7 @@ apply_reserved_offer(NodeKey, OfferHelper) ->
PBPort,
DisterlPort,
AgentIdValue,
ExecutorIdValue,
ContainerPath),

{ok, rms_offer_helper:add_task_to_launch(TaskInfo,
Expand Down

0 comments on commit cac94b9

Please sign in to comment.