Skip to content

Commit

Permalink
Merge pull request #44 from basho-labs/ack-fix-offers
Browse files Browse the repository at this point in the history
draining queue more often, cleanup
  • Loading branch information
drewkerrigan committed Apr 4, 2016
2 parents 1e072e2 + 10ed1ab commit d613f96
Showing 1 changed file with 30 additions and 47 deletions.
77 changes: 30 additions & 47 deletions src/rms_scheduler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -122,41 +122,20 @@ disconnected(_SchedulerInfo, State) ->
-spec resource_offers(erl_mesos_scheduler:scheduler_info(),
erl_mesos:'Event.Offers'(), state()) ->
{ok, state()} | {stop, state()}.
resource_offers(SchedulerInfo, #'Event.Offers'{offers = Offers},
#state{scheduler = #scheduler{options = Options}}=State) ->
Constraints = proplists:get_value(constraints, Options),
Filters = #'Filters'{refuse_seconds = ?OFFER_INTERVAL},
OpsState = lists:foldl(
fun(Offer, S1) ->
case S1 of
#state{} ->
{OfferIds, Operations} = apply_offers([Offer], Constraints),
case length(Operations) of
0 ->
ok;
_Len ->
lager:info("Scheduler accept operations: ~p.", [Operations])
end,
case call(accept, [SchedulerInfo, OfferIds, Operations, Filters], S1) of
{ok, S2} -> S2;
Error -> Error
end;
_ -> S1
end
end, State, Offers),

case OpsState of
#state{} ->
resource_offers(SchedulerInfo, #'Event.Offers'{offers = Offers}, State) ->
case apply_offers(SchedulerInfo, Offers, State) of
{ok, State1} ->
ExecsToShutdown = rms_cluster_manager:executors_to_shutdown(),
shutdown_executors(SchedulerInfo, ExecsToShutdown, OpsState);
R -> R
case shutdown_executors(SchedulerInfo, ExecsToShutdown, State1) of
{ok, State2} ->
%% Attempt to drain queue
exec_calls(State2);
Response2 -> Response2
end;
Response1 ->
Response1
end.






-spec offer_rescinded(erl_mesos_scheduler:scheduler_info(),
erl_mesos:'Event.Rescind'(), state()) ->
{ok, state()}.
Expand Down Expand Up @@ -369,21 +348,25 @@ exec_calls(#state{calls_queue = CallsQueue} = State) ->
{ok, State1}
end.

-spec apply_offers([erl_mesos:'Offer'()], rms_offer_helper:constraints()) ->
{[erl_mesos:'OfferID'()], [erl_mesos:'Offer.Operation'()]}.
apply_offers(Offers, Constraints) ->
apply_offers(Offers, Constraints, [], []).

-spec apply_offers([erl_mesos:'Offer'()],
rms_offer_helper:constraints(),
[erl_mesos:'OfferID'()], [erl_mesos:'Offer.Operation'()]) ->
{[erl_mesos:'OfferID'()], [erl_mesos:'Offer.Operation'()]}.
apply_offers([Offer | Offers], Constraints, OfferIds, Operations) ->
{OfferId, Operations1} = apply_offer(Offer, Constraints),
apply_offers(Offers, Constraints, [OfferId | OfferIds],
Operations ++ Operations1);
apply_offers([], _, OfferIds, Operations) ->
{OfferIds, Operations}.
-spec apply_offers(erl_mesos_scheduler:scheduler_info(), [erl_mesos:'Offer'()], state()) ->
{ok, state()} | {stop, state()}.
apply_offers(_, [], State) ->
{ok, State};
apply_offers(SchedulerInfo, [Offer|Offers], #state{scheduler = #scheduler{options = Options}}=State) ->
Constraints = proplists:get_value(constraints, Options),
Filters = #'Filters'{refuse_seconds = ?OFFER_INTERVAL},
{OfferId, Operations} = apply_offer(Offer, Constraints),
case length(Operations) of
0 ->
ok;
_Len ->
lager:info("Scheduler accept operations: ~p.", [Operations])
end,
case call(accept, [SchedulerInfo, [OfferId], Operations, Filters], State) of
{ok, State1} ->
apply_offers(SchedulerInfo, Offers, State1);
_ -> {stop, State}
end.

-spec apply_offer(erl_mesos:'Offer'(), rms_offer_helper:constraints()) ->
{erl_mesos:'OfferID'(), [erl_mesos:'Offer.Operation'()]}.
Expand Down

0 comments on commit d613f96

Please sign in to comment.