diff --git a/src/riak_core_worker_pool.erl b/src/riak_core_worker_pool.erl index be78cc3d8..4c9e4918f 100644 --- a/src/riak_core_worker_pool.erl +++ b/src/riak_core_worker_pool.erl @@ -56,8 +56,6 @@ %% gen_fsm states -export([queueing/2, ready/2, ready/3, queueing/3, shutdown/2, shutdown/3]). --export([monitor_worker/4]). - -include_lib("kernel/include/logger.hrl"). -ifdef(PULSE). @@ -66,12 +64,11 @@ -compile({pulse_replace_module, [{gen_fsm, pulse_gen_fsm}]}). -endif. --define(SHUTDOWN_WAIT, 60000). -define(LOG_LOOP_MS, 60000). -record(state, {queue = queue:new(), pool :: pid(), - monitors = [] :: list(), + monitors = [] :: list(monitor()), shutdown :: undefined | {pid(), reference()}, callback_mod :: atom(), pool_name :: atom(), @@ -79,6 +76,7 @@ }). -type checkout() :: {pid(), erlang:timestamp()}. +-type monitor() :: {pid(), reference(), term(), term()}. -callback handle_work(Pid::pid(), Work::term(), From::term()) -> any(). @@ -113,9 +111,7 @@ init([PoolBoyArgs, CallbackMod, PoolName]) -> erlang:send_after(?LOG_LOOP_MS, self(), log_timer), {ok, ready, - #state{pool=Pid, - callback_mod = CallbackMod, - pool_name = PoolName}}. + #state{pool=Pid, callback_mod = CallbackMod, pool_name = PoolName}}. ready(_Event, _From, State) -> {reply, ok, ready, State}. @@ -126,22 +122,17 @@ queueing(_Event, _From, State) -> shutdown(_Event, _From, State) -> {reply, ok, shutdown, State}. -ready({work, Work, From} = Msg, - #state{pool=Pool, - queue=Q, - pool_name=PoolName, - checkouts=Checkouts0, - monitors=Monitors0} = State) -> - case poolboy_checkout(Pool, PoolName, Checkouts0) of +ready({work, Work, From} = Msg, State) -> + PoolName = State#state.pool_name, + Pool = State#state.pool, + Mod = State#state.callback_mod, + case poolboy_checkout(Pool, State#state.checkouts) of full -> + Q = State#state.queue, {next_state, queueing, State#state{queue=push_to_queue(Msg, Q)}}; {Pid, Checkouts} when is_pid(Pid) -> - Monitors = - riak_core_worker_pool:monitor_worker(Pid, - From, - Work, - Monitors0), - do_work(Pid, Work, From, State#state.callback_mod), + Monitors = monitor_worker(Pid, From, Work, State#state.monitors), + do_work(Pid, Work, From, Mod, PoolName, direct), {next_state, ready, State#state{monitors=Monitors, checkouts = Checkouts}} @@ -162,68 +153,48 @@ shutdown({work, _Work, From}, State) -> shutdown(_Event, State) -> {next_state, shutdown, State}. -handle_event({checkin, Pid}, - shutdown, - #state{pool=Pool, - pool_name=PoolName, - monitors=Monitors0, - checkouts=Checkouts0} = State) -> - Monitors = demonitor_worker(Pid, Monitors0), +handle_event({checkin, Worker}, StateName, State) -> + Monitors = demonitor_worker(Worker, State#state.monitors), {ok, Checkouts} = - poolboy_checkin(Pool, Pid, PoolName, Checkouts0), - case Monitors of - [] -> %% work all done, time to exit! + poolboy_checkin( + State#state.pool, + Worker, + State#state.pool_name, + State#state.checkouts + ), + case {StateName, Monitors} of + {shutdown, []} -> {stop, shutdown, State}; - _ -> + {shutdown, _} -> {next_state, shutdown, - State#state{monitors=Monitors, checkouts=Checkouts}} - end; -handle_event({checkin, Worker}, - _, - #state{pool=Pool, - queue=Q, - pool_name=PoolName, - checkouts=Checkouts0, - monitors=Monitors0} = State) -> - case consume_from_queue(Q, State#state.pool_name) of - {{value, {work, Work, From}}, Rem} -> - %% there is outstanding work to do - instead of checking - %% the worker back in, just hand it more work to do - Monitors = monitor_worker(Worker, From, Work, Monitors0), - do_work(Worker, Work, From, State#state.callback_mod), - {next_state, - queueing, State#state{queue=Rem, monitors=Monitors}}; - {empty, Empty} -> - Monitors = demonitor_worker(Worker, Monitors0), - {ok, Checkouts} = - poolboy_checkin(Pool, Worker, PoolName, Checkouts0), - {next_state, - ready, - State#state{queue=Empty, - monitors=Monitors, - checkouts=Checkouts}} + State#state{monitors=Monitors, checkouts=Checkouts}}; + _ -> + handle_event( + worker_available, + StateName, + State#state{monitors=Monitors, checkouts=Checkouts}) end; -handle_event(worker_start, StateName, - #state{pool=Pool, - queue=Q, - pool_name=PoolName, - checkouts=Checkouts0, - monitors=Monitors0}=State) -> +handle_event(worker_start, StateName, State) -> %% a new worker just started - if we have work pending, try to do it - case consume_from_queue(Q, State#state.pool_name) of - {{value, {work, Work, From}}, Rem} -> - case poolboy_checkout(Pool, PoolName, Checkouts0) of + handle_event(worker_available, StateName, State); +handle_event(worker_available, StateName, State) -> + PoolName = State#state.pool_name, + Pool = State#state.pool, + Mod = State#state.callback_mod, + case queue:out(State#state.queue) of + {{value, {QT, {work, Work, From}}}, Rem} -> + case poolboy_checkout(Pool, State#state.checkouts) of full -> {next_state, queueing, State}; {Pid, Checkouts} when is_pid(Pid) -> - Monitors = monitor_worker(Pid, From, Work, Monitors0), - do_work(Pid, Work, From, State#state.callback_mod), + Monitors = + monitor_worker(Pid, From, Work, State#state.monitors), + do_work(Pid, Work, From, Mod, PoolName, QT), {next_state, queueing, - State#state{queue=Rem, - monitors=Monitors, - checkouts=Checkouts}} + State#state{ + queue=Rem, monitors=Monitors, checkouts=Checkouts}} end; {empty, _} -> {next_state, @@ -237,7 +208,6 @@ handle_event(worker_start, StateName, end, State} end; - handle_event(_Event, StateName, State) -> {next_state, StateName, State}. @@ -273,10 +243,10 @@ handle_info(log_timer, StateName, State) -> QL = queue:len(State#state.queue), _ = ?LOG_INFO( - "worker_pool=~w has qlen=~w with last_checkout=~w s ago", + "worker_pool=~w has qlen=~w with last_checkout=~w ms ago", [State#state.pool_name, QL, - LastCheckout div (1000 * 1000)]), + LastCheckout div 1000]), ok; {true, []} -> _ = @@ -328,16 +298,14 @@ code_change(_OldVsn, StateName, State, _Extra) -> %% Keep track of which worker we pair with what work/from and monitor the %% worker. Only active workers are tracked +-spec monitor_worker( + pid(), term(), term(), list(monitor())) -> list(monitor()). monitor_worker(Worker, From, Work, Monitors) -> - case lists:keyfind(Worker, 1, Monitors) of - {Worker, Ref, _OldFrom, _OldWork} -> - %% reuse old monitor and just update the from & work - lists:keyreplace(Worker, 1, Monitors, {Worker, Ref, From, Work}); - false -> - Ref = erlang:monitor(process, Worker), - [{Worker, Ref, From, Work} | Monitors] - end. + Ref = erlang:monitor(process, Worker), + %% ukeysort deletes all monitors from Monitors with same worker + lists:ukeysort(1, [{Worker, Ref, From, Work} | Monitors]). +-spec demonitor_worker(pid(), list(monitor())) -> list(monitor()). demonitor_worker(Worker, Monitors) -> case lists:keyfind(Worker, 1, Monitors) of {Worker, Ref, _From, _Work} -> @@ -348,6 +316,7 @@ demonitor_worker(Worker, Monitors) -> Monitors end. +-spec discard_queued_work(queue:queue(), atom()) -> ok. discard_queued_work(Q, Mod) -> case queue:out(Q) of {{value, {_QT, {work, _Work, From}}}, Rem} -> @@ -357,16 +326,17 @@ discard_queued_work(Q, Mod) -> ok end. --spec poolboy_checkin(pid(), pid(), atom(), list(checkout())) - -> {ok, list(checkout())}. +-spec poolboy_checkin( + pid(), pid(), atom(), list(checkout())) -> {ok, list(checkout())}. poolboy_checkin(Pool, Worker, PoolName, Checkouts) -> R = poolboy:checkin(Pool, Worker), case lists:keytake(Worker, 1, Checkouts) of {value, {Worker, WT}, Checkouts0} -> - riak_core_stat:update({worker_pool, - work_time, - PoolName, - timer:now_diff(os:timestamp(), WT)}), + riak_core_stat:update( + {worker_pool, + work_time, + PoolName, + timer:now_diff(os:timestamp(), WT)}), {R, Checkouts0}; _ -> ?LOG_WARNING( @@ -374,21 +344,31 @@ poolboy_checkin(Pool, Worker, PoolName, Checkouts) -> {R, Checkouts} end. --spec poolboy_checkout(pid(), atom(), list(checkout())) - -> full | {pid(), list(checkout())}. -poolboy_checkout(Pool, PoolName, Checkouts) -> +-spec poolboy_checkout( + pid(), list(checkout())) -> full | {pid(), list(checkout())}. +poolboy_checkout(Pool, Checkouts) -> case poolboy:checkout(Pool, false) of full -> full; P when is_pid(P) -> - riak_core_stat:update({worker_pool, - queue_time, - PoolName, - 0}), {P, [{P, os:timestamp()}|Checkouts]} end. -do_work(Pid, Work, From, Mod) -> +-spec do_work( + pid(), term(), term(), atom(), atom(), erlang:timestamp()|direct) -> any(). +do_work(Pid, Work, From, Mod, PoolName, QT) -> + QueueTime = + case QT of + direct -> + 0; + _ -> + timer:now_diff(os:timestamp(), QT) + end, + riak_core_stat:update( + {worker_pool, + queue_time, + PoolName, + QueueTime}), Mod:do_work(Pid, Work, From). -spec push_to_queue({work, term(), term()}, queue:queue()) -> queue:queue(). @@ -396,17 +376,3 @@ push_to_queue(Msg, Q) -> QT = os:timestamp(), queue:in({QT, Msg}, Q). --spec consume_from_queue(queue:queue(), atom()) -> - {empty | {value, {work, term(), term()}}, - queue:queue()}. -consume_from_queue(Q, PoolName) -> - case queue:out(Q) of - {empty, Empty} -> - {empty, Empty}; - {{value, {QT, {work, Work, From}}}, Rem} -> - riak_core_stat:update({worker_pool, - queue_time, - PoolName, - timer:now_diff(os:timestamp(), QT)}), - {{value, {work, Work, From}}, Rem} - end.