From 10ed1abd6edadf12b253ad30c21cd12809978e10 Mon Sep 17 00:00:00 2001 From: Drew Kerrigan Date: Mon, 4 Apr 2016 15:38:27 -0400 Subject: [PATCH] draining queue more often, cleanup --- src/rms_scheduler.erl | 77 +++++++++++++++++-------------------------- 1 file changed, 30 insertions(+), 47 deletions(-) diff --git a/src/rms_scheduler.erl b/src/rms_scheduler.erl index 7a1e631..047ad89 100644 --- a/src/rms_scheduler.erl +++ b/src/rms_scheduler.erl @@ -122,41 +122,20 @@ disconnected(_SchedulerInfo, State) -> -spec resource_offers(erl_mesos_scheduler:scheduler_info(), erl_mesos:'Event.Offers'(), state()) -> {ok, state()} | {stop, state()}. -resource_offers(SchedulerInfo, #'Event.Offers'{offers = Offers}, - #state{scheduler = #scheduler{options = Options}}=State) -> - Constraints = proplists:get_value(constraints, Options), - Filters = #'Filters'{refuse_seconds = ?OFFER_INTERVAL}, - OpsState = lists:foldl( - fun(Offer, S1) -> - case S1 of - #state{} -> - {OfferIds, Operations} = apply_offers([Offer], Constraints), - case length(Operations) of - 0 -> - ok; - _Len -> - lager:info("Scheduler accept operations: ~p.", [Operations]) - end, - case call(accept, [SchedulerInfo, OfferIds, Operations, Filters], S1) of - {ok, S2} -> S2; - Error -> Error - end; - _ -> S1 - end - end, State, Offers), - - case OpsState of - #state{} -> +resource_offers(SchedulerInfo, #'Event.Offers'{offers = Offers}, State) -> + case apply_offers(SchedulerInfo, Offers, State) of + {ok, State1} -> ExecsToShutdown = rms_cluster_manager:executors_to_shutdown(), - shutdown_executors(SchedulerInfo, ExecsToShutdown, OpsState); - R -> R + case shutdown_executors(SchedulerInfo, ExecsToShutdown, State1) of + {ok, State2} -> + %% Attempt to drain queue + exec_calls(State2); + Response2 -> Response2 + end; + Response1 -> + Response1 end. - - - - - -spec offer_rescinded(erl_mesos_scheduler:scheduler_info(), erl_mesos:'Event.Rescind'(), state()) -> {ok, state()}. @@ -369,21 +348,25 @@ exec_calls(#state{calls_queue = CallsQueue} = State) -> {ok, State1} end. --spec apply_offers([erl_mesos:'Offer'()], rms_offer_helper:constraints()) -> - {[erl_mesos:'OfferID'()], [erl_mesos:'Offer.Operation'()]}. -apply_offers(Offers, Constraints) -> - apply_offers(Offers, Constraints, [], []). - --spec apply_offers([erl_mesos:'Offer'()], - rms_offer_helper:constraints(), - [erl_mesos:'OfferID'()], [erl_mesos:'Offer.Operation'()]) -> - {[erl_mesos:'OfferID'()], [erl_mesos:'Offer.Operation'()]}. -apply_offers([Offer | Offers], Constraints, OfferIds, Operations) -> - {OfferId, Operations1} = apply_offer(Offer, Constraints), - apply_offers(Offers, Constraints, [OfferId | OfferIds], - Operations ++ Operations1); -apply_offers([], _, OfferIds, Operations) -> - {OfferIds, Operations}. +-spec apply_offers(erl_mesos_scheduler:scheduler_info(), [erl_mesos:'Offer'()], state()) -> + {ok, state()} | {stop, state()}. +apply_offers(_, [], State) -> + {ok, State}; +apply_offers(SchedulerInfo, [Offer|Offers], #state{scheduler = #scheduler{options = Options}}=State) -> + Constraints = proplists:get_value(constraints, Options), + Filters = #'Filters'{refuse_seconds = ?OFFER_INTERVAL}, + {OfferId, Operations} = apply_offer(Offer, Constraints), + case length(Operations) of + 0 -> + ok; + _Len -> + lager:info("Scheduler accept operations: ~p.", [Operations]) + end, + case call(accept, [SchedulerInfo, [OfferId], Operations, Filters], State) of + {ok, State1} -> + apply_offers(SchedulerInfo, Offers, State1); + _ -> {stop, State} + end. -spec apply_offer(erl_mesos:'Offer'(), rms_offer_helper:constraints()) -> {erl_mesos:'OfferID'(), [erl_mesos:'Offer.Operation'()]}.