Skip to content

Commit

Permalink
Merge pull request #7 from nhs-riak/nhse-d32-nhscore.i6
Browse files Browse the repository at this point in the history
Simplify worker_pool
  • Loading branch information
martinsumner authored Mar 28, 2024
2 parents 0c19baf + 0426602 commit 3654131
Showing 1 changed file with 76 additions and 110 deletions.
186 changes: 76 additions & 110 deletions src/riak_core_worker_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@
%% gen_fsm states
-export([queueing/2, ready/2, ready/3, queueing/3, shutdown/2, shutdown/3]).

-export([monitor_worker/4]).

-include_lib("kernel/include/logger.hrl").

-ifdef(PULSE).
Expand All @@ -66,19 +64,19 @@
-compile({pulse_replace_module, [{gen_fsm, pulse_gen_fsm}]}).
-endif.

-define(SHUTDOWN_WAIT, 60000).
-define(LOG_LOOP_MS, 60000).

-record(state, {queue = queue:new(),
pool :: pid(),
monitors = [] :: list(),
monitors = [] :: list(monitor()),
shutdown :: undefined | {pid(), reference()},
callback_mod :: atom(),
pool_name :: atom(),
checkouts = [] :: list(checkout())
}).

-type checkout() :: {pid(), erlang:timestamp()}.
-type monitor() :: {pid(), reference(), term(), term()}.

-callback handle_work(Pid::pid(), Work::term(), From::term()) -> any().

Expand Down Expand Up @@ -113,9 +111,7 @@ init([PoolBoyArgs, CallbackMod, PoolName]) ->
erlang:send_after(?LOG_LOOP_MS, self(), log_timer),
{ok,
ready,
#state{pool=Pid,
callback_mod = CallbackMod,
pool_name = PoolName}}.
#state{pool=Pid, callback_mod = CallbackMod, pool_name = PoolName}}.

ready(_Event, _From, State) ->
{reply, ok, ready, State}.
Expand All @@ -126,22 +122,17 @@ queueing(_Event, _From, State) ->
shutdown(_Event, _From, State) ->
{reply, ok, shutdown, State}.

ready({work, Work, From} = Msg,
#state{pool=Pool,
queue=Q,
pool_name=PoolName,
checkouts=Checkouts0,
monitors=Monitors0} = State) ->
case poolboy_checkout(Pool, PoolName, Checkouts0) of
ready({work, Work, From} = Msg, State) ->
PoolName = State#state.pool_name,
Pool = State#state.pool,
Mod = State#state.callback_mod,
case poolboy_checkout(Pool, State#state.checkouts) of
full ->
Q = State#state.queue,
{next_state, queueing, State#state{queue=push_to_queue(Msg, Q)}};
{Pid, Checkouts} when is_pid(Pid) ->
Monitors =
riak_core_worker_pool:monitor_worker(Pid,
From,
Work,
Monitors0),
do_work(Pid, Work, From, State#state.callback_mod),
Monitors = monitor_worker(Pid, From, Work, State#state.monitors),
do_work(Pid, Work, From, Mod, PoolName, direct),
{next_state,
ready,
State#state{monitors=Monitors, checkouts = Checkouts}}
Expand All @@ -162,68 +153,48 @@ shutdown({work, _Work, From}, State) ->
shutdown(_Event, State) ->
{next_state, shutdown, State}.

handle_event({checkin, Pid},
shutdown,
#state{pool=Pool,
pool_name=PoolName,
monitors=Monitors0,
checkouts=Checkouts0} = State) ->
Monitors = demonitor_worker(Pid, Monitors0),
handle_event({checkin, Worker}, StateName, State) ->
Monitors = demonitor_worker(Worker, State#state.monitors),
{ok, Checkouts} =
poolboy_checkin(Pool, Pid, PoolName, Checkouts0),
case Monitors of
[] -> %% work all done, time to exit!
poolboy_checkin(
State#state.pool,
Worker,
State#state.pool_name,
State#state.checkouts
),
case {StateName, Monitors} of
{shutdown, []} ->
{stop, shutdown, State};
_ ->
{shutdown, _} ->
{next_state,
shutdown,
State#state{monitors=Monitors, checkouts=Checkouts}}
end;
handle_event({checkin, Worker},
_,
#state{pool=Pool,
queue=Q,
pool_name=PoolName,
checkouts=Checkouts0,
monitors=Monitors0} = State) ->
case consume_from_queue(Q, State#state.pool_name) of
{{value, {work, Work, From}}, Rem} ->
%% there is outstanding work to do - instead of checking
%% the worker back in, just hand it more work to do
Monitors = monitor_worker(Worker, From, Work, Monitors0),
do_work(Worker, Work, From, State#state.callback_mod),
{next_state,
queueing, State#state{queue=Rem, monitors=Monitors}};
{empty, Empty} ->
Monitors = demonitor_worker(Worker, Monitors0),
{ok, Checkouts} =
poolboy_checkin(Pool, Worker, PoolName, Checkouts0),
{next_state,
ready,
State#state{queue=Empty,
monitors=Monitors,
checkouts=Checkouts}}
State#state{monitors=Monitors, checkouts=Checkouts}};
_ ->
handle_event(
worker_available,
StateName,
State#state{monitors=Monitors, checkouts=Checkouts})
end;
handle_event(worker_start, StateName,
#state{pool=Pool,
queue=Q,
pool_name=PoolName,
checkouts=Checkouts0,
monitors=Monitors0}=State) ->
handle_event(worker_start, StateName, State) ->
%% a new worker just started - if we have work pending, try to do it
case consume_from_queue(Q, State#state.pool_name) of
{{value, {work, Work, From}}, Rem} ->
case poolboy_checkout(Pool, PoolName, Checkouts0) of
handle_event(worker_available, StateName, State);
handle_event(worker_available, StateName, State) ->
PoolName = State#state.pool_name,
Pool = State#state.pool,
Mod = State#state.callback_mod,
case queue:out(State#state.queue) of
{{value, {QT, {work, Work, From}}}, Rem} ->
case poolboy_checkout(Pool, State#state.checkouts) of
full ->
{next_state, queueing, State};
{Pid, Checkouts} when is_pid(Pid) ->
Monitors = monitor_worker(Pid, From, Work, Monitors0),
do_work(Pid, Work, From, State#state.callback_mod),
Monitors =
monitor_worker(Pid, From, Work, State#state.monitors),
do_work(Pid, Work, From, Mod, PoolName, QT),
{next_state,
queueing,
State#state{queue=Rem,
monitors=Monitors,
checkouts=Checkouts}}
State#state{
queue=Rem, monitors=Monitors, checkouts=Checkouts}}
end;
{empty, _} ->
{next_state,
Expand All @@ -237,7 +208,6 @@ handle_event(worker_start, StateName,
end,
State}
end;

handle_event(_Event, StateName, State) ->
{next_state, StateName, State}.

Expand Down Expand Up @@ -273,10 +243,10 @@ handle_info(log_timer, StateName, State) ->
QL = queue:len(State#state.queue),
_ =
?LOG_INFO(
"worker_pool=~w has qlen=~w with last_checkout=~w s ago",
"worker_pool=~w has qlen=~w with last_checkout=~w ms ago",
[State#state.pool_name,
QL,
LastCheckout div (1000 * 1000)]),
LastCheckout div 1000]),
ok;
{true, []} ->
_ =
Expand Down Expand Up @@ -328,16 +298,14 @@ code_change(_OldVsn, StateName, State, _Extra) ->

%% Keep track of which worker we pair with what work/from and monitor the
%% worker. Only active workers are tracked
-spec monitor_worker(
pid(), term(), term(), list(monitor())) -> list(monitor()).
monitor_worker(Worker, From, Work, Monitors) ->
case lists:keyfind(Worker, 1, Monitors) of
{Worker, Ref, _OldFrom, _OldWork} ->
%% reuse old monitor and just update the from & work
lists:keyreplace(Worker, 1, Monitors, {Worker, Ref, From, Work});
false ->
Ref = erlang:monitor(process, Worker),
[{Worker, Ref, From, Work} | Monitors]
end.
Ref = erlang:monitor(process, Worker),
%% ukeysort deletes all monitors from Monitors with same worker
lists:ukeysort(1, [{Worker, Ref, From, Work} | Monitors]).

-spec demonitor_worker(pid(), list(monitor())) -> list(monitor()).
demonitor_worker(Worker, Monitors) ->
case lists:keyfind(Worker, 1, Monitors) of
{Worker, Ref, _From, _Work} ->
Expand All @@ -348,6 +316,7 @@ demonitor_worker(Worker, Monitors) ->
Monitors
end.

-spec discard_queued_work(queue:queue(), atom()) -> ok.
discard_queued_work(Q, Mod) ->
case queue:out(Q) of
{{value, {_QT, {work, _Work, From}}}, Rem} ->
Expand All @@ -357,56 +326,53 @@ discard_queued_work(Q, Mod) ->
ok
end.

-spec poolboy_checkin(pid(), pid(), atom(), list(checkout()))
-> {ok, list(checkout())}.
-spec poolboy_checkin(
pid(), pid(), atom(), list(checkout())) -> {ok, list(checkout())}.
poolboy_checkin(Pool, Worker, PoolName, Checkouts) ->
R = poolboy:checkin(Pool, Worker),
case lists:keytake(Worker, 1, Checkouts) of
{value, {Worker, WT}, Checkouts0} ->
riak_core_stat:update({worker_pool,
work_time,
PoolName,
timer:now_diff(os:timestamp(), WT)}),
riak_core_stat:update(
{worker_pool,
work_time,
PoolName,
timer:now_diff(os:timestamp(), WT)}),
{R, Checkouts0};
_ ->
?LOG_WARNING(
"Unexplained poolboy behaviour - failure to track checkouts"),
{R, Checkouts}
end.

-spec poolboy_checkout(pid(), atom(), list(checkout()))
-> full | {pid(), list(checkout())}.
poolboy_checkout(Pool, PoolName, Checkouts) ->
-spec poolboy_checkout(
pid(), list(checkout())) -> full | {pid(), list(checkout())}.
poolboy_checkout(Pool, Checkouts) ->
case poolboy:checkout(Pool, false) of
full ->
full;
P when is_pid(P) ->
riak_core_stat:update({worker_pool,
queue_time,
PoolName,
0}),
{P, [{P, os:timestamp()}|Checkouts]}
end.

do_work(Pid, Work, From, Mod) ->
-spec do_work(
pid(), term(), term(), atom(), atom(), erlang:timestamp()|direct) -> any().
do_work(Pid, Work, From, Mod, PoolName, QT) ->
QueueTime =
case QT of
direct ->
0;
_ ->
timer:now_diff(os:timestamp(), QT)
end,
riak_core_stat:update(
{worker_pool,
queue_time,
PoolName,
QueueTime}),
Mod:do_work(Pid, Work, From).

-spec push_to_queue({work, term(), term()}, queue:queue()) -> queue:queue().
push_to_queue(Msg, Q) ->
QT = os:timestamp(),
queue:in({QT, Msg}, Q).

-spec consume_from_queue(queue:queue(), atom()) ->
{empty | {value, {work, term(), term()}},
queue:queue()}.
consume_from_queue(Q, PoolName) ->
case queue:out(Q) of
{empty, Empty} ->
{empty, Empty};
{{value, {QT, {work, Work, From}}}, Rem} ->
riak_core_stat:update({worker_pool,
queue_time,
PoolName,
timer:now_diff(os:timestamp(), QT)}),
{{value, {work, Work, From}}, Rem}
end.

0 comments on commit 3654131

Please sign in to comment.