Skip to content

Commit

Permalink
Merge pull request #188 from inaka/move_lists_to_maps
Browse files Browse the repository at this point in the history
Move lists to maps
  • Loading branch information
elbrujohalcon authored Jan 7, 2022
2 parents df43aad + 54ed6b5 commit 60a2ee5
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 80 deletions.
18 changes: 2 additions & 16 deletions src/wpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@

-behaviour(application).

-define(DEFAULTS,
[{overrun_warning, infinity},
{max_overrun_warnings, infinity},
{overrun_handler, {error_logger, warning_report}},
{workers, 100},
{worker_opt, []},
{queue_type, fifo}]).

%% Copied from gen.erl
-type debug_flag() :: trace | log | statistics | debug | {logfile, string()}.
-type gen_option() ::
Expand Down Expand Up @@ -115,7 +107,7 @@ start_pool(Name) ->
-spec start_pool(name(), [option()]) ->
{ok, pid()} | {error, {already_started, pid()} | term()}.
start_pool(Name, Options) ->
wpool_pool:start_link(Name, all_opts(Options)).
wpool_pool:start_link(Name, wpool_utils:add_defaults(Options)).

%% @doc Stops the pool
-spec stop_pool(name()) -> true.
Expand All @@ -136,7 +128,7 @@ start_sup_pool(Name) ->
-spec start_sup_pool(name(), [option()]) ->
{ok, pid()} | {error, {already_started, pid()} | term()}.
start_sup_pool(Name, Options) ->
wpool_sup:start_pool(Name, all_opts(Options)).
wpool_sup:start_pool(Name, wpool_utils:add_defaults(Options)).

%% @doc Stops the pool
-spec stop_sup_pool(name()) -> ok.
Expand Down Expand Up @@ -243,9 +235,3 @@ get_workers(Sup) ->
-spec broadcast(wpool:name(), term()) -> ok.
broadcast(Sup, Cast) ->
wpool_pool:broadcast(Sup, Cast).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% PRIVATE
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
all_opts(Options) ->
Options ++ ?DEFAULTS.
58 changes: 25 additions & 33 deletions src/wpool_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@
{name :: atom(),
mod :: atom(),
state :: term(),
options :: [{time_checker | queue_manager, atom()} | wpool:option()]}).
options ::
#{time_checker := atom(),
queue_manager := atom(),
overrun_warning := timeout(),
_ => _}}).

-type state() :: #state{}.
-type from() :: {pid(), reference()}.
-type next_step() :: timeout() | hibernate | {continue, term()}.
-type options() :: [{time_checker | queue_manager, atom()} | wpool:option()].

%% api
-export([start_link/4, call/3, cast/2, send_request/2]).
Expand All @@ -38,13 +43,14 @@
%%% API
%%%===================================================================
%% @doc Starts a named process
-spec start_link(wpool:name(), module(), term(), [wpool:option()]) ->
-spec start_link(wpool:name(), module(), term(), options()) ->
{ok, pid()} | ignore | {error, {already_started, pid()} | term()}.
start_link(Name, Module, InitArgs, Options) ->
WorkerOpt = proplists:get_value(worker_opt, Options, []),
FullOpts = wpool_utils:add_defaults(Options),
WorkerOpt = proplists:get_value(worker_opt, FullOpts, []),
gen_server:start_link({local, Name},
?MODULE,
{Name, Module, InitArgs, Options},
{Name, Module, InitArgs, FullOpts},
WorkerOpt).

%% @equiv gen_server:call(Process, Call, Timeout)
Expand All @@ -66,9 +72,10 @@ send_request(Name, Request) ->
%%% init, terminate, code_change, info callbacks
%%%===================================================================
%% @private
-spec init({atom(), atom(), term(), [wpool:option()]}) ->
-spec init({atom(), atom(), term(), options()}) ->
{ok, state()} | {ok, state(), next_step()} | {stop, can_not_ignore} | {stop, term()}.
init({Name, Mod, InitArgs, Options}) ->
init({Name, Mod, InitArgs, LOptions}) ->
Options = maps:from_list(LOptions),
wpool_process_callbacks:notify(handle_init_start, Options, [Name]),
case Mod:init(InitArgs) of
{ok, ModState} ->
Expand Down Expand Up @@ -180,15 +187,9 @@ format_status(Opt, [PDict, State]) ->
%% @private
-spec handle_cast(term(), state()) ->
{noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}.
handle_cast(Cast, State) ->
Task =
wpool_utils:task_init({cast, Cast},
proplists:get_value(time_checker, State#state.options, undefined),
proplists:get_value(overrun_warning, State#state.options, infinity),
proplists:get_value(max_overrun_warnings,
State#state.options,
infinity)),
ok = notify_queue_manager(worker_busy, State#state.name, State#state.options),
handle_cast(Cast, #state{options = Options} = State) ->
Task = wpool_utils:task_init({cast, Cast}, Options),
ok = notify_queue_manager(worker_busy, State#state.name, Options),
Reply =
try (State#state.mod):handle_cast(Cast, State#state.state) of
{noreply, NewState} ->
Expand All @@ -206,7 +207,7 @@ handle_cast(Cast, State) ->
{stop, Reason, State#state{state = NewState}}
end,
wpool_utils:task_end(Task),
ok = notify_queue_manager(worker_ready, State#state.name, State#state.options),
ok = notify_queue_manager(worker_ready, State#state.name, Options),
Reply.

%% @private
Expand All @@ -217,15 +218,9 @@ handle_cast(Cast, State) ->
{noreply, state(), next_step()} |
{stop, term(), term(), state()} |
{stop, term(), state()}.
handle_call(Call, From, State) ->
Task =
wpool_utils:task_init({call, Call},
proplists:get_value(time_checker, State#state.options, undefined),
proplists:get_value(overrun_warning, State#state.options, infinity),
proplists:get_value(max_overrun_warnings,
State#state.options,
infinity)),
ok = notify_queue_manager(worker_busy, State#state.name, State#state.options),
handle_call(Call, From, #state{options = Options} = State) ->
Task = wpool_utils:task_init({call, Call}, Options),
ok = notify_queue_manager(worker_busy, State#state.name, Options),
Reply =
try (State#state.mod):handle_call(Call, From, State#state.state) of
{noreply, NewState} ->
Expand Down Expand Up @@ -255,13 +250,10 @@ handle_call(Call, From, State) ->
{stop, Reason, Response, State#state{state = NewState}}
end,
wpool_utils:task_end(Task),
ok = notify_queue_manager(worker_ready, State#state.name, State#state.options),
ok = notify_queue_manager(worker_ready, State#state.name, Options),
Reply.

notify_queue_manager(Function, Name, Options) ->
case proplists:get_value(queue_manager, Options) of
undefined ->
ok;
QueueManager ->
wpool_queue_manager:Function(QueueManager, Name)
end.
notify_queue_manager(Function, Name, #{queue_manager := QueueManager}) ->
wpool_queue_manager:Function(QueueManager, Name);
notify_queue_manager(_, _, _) ->
ok.
13 changes: 5 additions & 8 deletions src/wpool_process_callbacks.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,11 @@ handle_call(Msg, State) ->
{ok, {error, {unexpected_call, Msg}}, State}.

%% @doc Sends a notification to all registered callback modules.
-spec notify(event(), [wpool:option()], [any()]) -> ok.
notify(Event, Options, Args) ->
case lists:keyfind(event_manager, 1, Options) of
{event_manager, EventMgr} ->
gen_event:notify(EventMgr, {Event, Args});
_ ->
ok
end.
-spec notify(event(), #{event_manager := any(), _ => _}, [any()]) -> ok.
notify(Event, #{event_manager := EventMgr}, Args) ->
gen_event:notify(EventMgr, {Event, Args});
notify(_, _, _) ->
ok.

%% @doc Adds a callback module.
-spec add_callback_module(wpool:name(), module()) -> ok | {error, any()}.
Expand Down
24 changes: 11 additions & 13 deletions src/wpool_queue_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
{wpool :: wpool:name(),
clients :: queue:queue({cast | {pid(), _}, term()}),
workers :: gb_sets:set(atom()),
monitors :: gb_trees:tree(atom(), monitored_from()),
monitors :: #{atom() := monitored_from()},
queue_type :: queue_type()}).

-type state() :: #state{}.
Expand Down Expand Up @@ -128,7 +128,7 @@ init(Args) ->
#state{wpool = WPool,
clients = queue:new(),
workers = gb_sets:new(),
monitors = gb_trees:empty(),
monitors = #{},
queue_type = QueueType}}.

-type worker_event() :: new_worker | worker_dead | worker_busy | worker_ready.
Expand All @@ -149,12 +149,11 @@ handle_cast({worker_ready, Worker}, State0) ->
queue_type = QueueType} =
State0,
State =
case gb_trees:is_defined(Worker, Mons) of
true ->
{Ref, _Client} = gb_trees:get(Worker, Mons),
case Mons of
#{Worker := {Ref, _Client}} ->
demonitor(Ref, [flush]),
State0#state{monitors = gb_trees:delete(Worker, Mons)};
false ->
State0#state{monitors = maps:remove(Worker, Mons)};
_ ->
State0
end,
case queue_out(Clients, QueueType) of
Expand Down Expand Up @@ -218,12 +217,11 @@ handle_call(pending_task_count, _From, State) ->
handle_info({'DOWN', Ref, Type, {Worker, _Node}, Exit}, State) ->
handle_info({'DOWN', Ref, Type, Worker, Exit}, State);
handle_info({'DOWN', _, _, Worker, Exit}, State = #state{monitors = Mons}) ->
case gb_trees:is_defined(Worker, Mons) of
true ->
{_Ref, Client} = gb_trees:get(Worker, Mons),
case Mons of
#{Worker := {_Ref, Client}} ->
gen_server:reply(Client, {'EXIT', Worker, Exit}),
{noreply, State#state{monitors = gb_trees:delete(Worker, Mons)}};
false ->
{noreply, State#state{monitors = maps:remove(Worker, Mons)}};
_ ->
{noreply, State}
end;
handle_info(_Info, State) ->
Expand Down Expand Up @@ -286,7 +284,7 @@ now_in_milliseconds() ->

monitor_worker(Worker, Client, State = #state{monitors = Mons}) ->
Ref = monitor(process, Worker),
State#state{monitors = gb_trees:enter(Worker, {Ref, Client}, Mons)}.
State#state{monitors = maps:put(Worker, {Ref, Client}, Mons)}.

queue_out(Clients, fifo) ->
queue:out(Clients);
Expand Down
32 changes: 22 additions & 10 deletions src/wpool_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,25 @@
-author('[email protected]').

%% API
-export([task_init/4, task_end/1]).
-export([task_init/2, task_end/1, add_defaults/1]).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Api
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @doc Marks Task as started in this worker
-spec task_init(term(), atom(), infinity | pos_integer(), infinity | pos_integer()) ->
-spec task_init(term(), #{overrun_warning := timeout(), _ => _}) ->
undefined | reference().
task_init(Task, _TimeChecker, infinity, _MaxWarnings) ->
Time =
calendar:datetime_to_gregorian_seconds(
calendar:universal_time()),
task_init(Task, #{overrun_warning := infinity}) ->
Time = erlang:system_time(second),
erlang:put(wpool_task, {undefined, Time, Task}),
undefined;
task_init(Task, TimeChecker, OverrunTime, MaxWarnings) ->
task_init(Task,
#{overrun_warning := OverrunTime,
time_checker := TimeChecker,
max_overrun_warnings := MaxWarnings}) ->
TaskId = erlang:make_ref(),
Time =
calendar:datetime_to_gregorian_seconds(
calendar:universal_time()),
Time = erlang:system_time(second),
erlang:put(wpool_task, {TaskId, Time, Task}),
erlang:send_after(OverrunTime,
TimeChecker,
Expand All @@ -50,3 +49,16 @@ task_end(undefined) ->
task_end(TimerRef) ->
_ = erlang:cancel_timer(TimerRef),
erlang:erase(wpool_task).

-spec add_defaults([wpool:option()]) -> [wpool:option()].
add_defaults(Opts) ->
lists:ukeymerge(1, lists:sort(Opts), defaults()).

-spec defaults() -> [wpool:option()].
defaults() ->
[{max_overrun_warnings, infinity},
{overrun_handler, {error_logger, warning_report}},
{overrun_warning, infinity},
{queue_type, fifo},
{worker_opt, []},
{workers, 100}].

0 comments on commit 60a2ee5

Please sign in to comment.