From 235d5792bf6978e17cc323f01944d448e1969595 Mon Sep 17 00:00:00 2001 From: Fred Hebert Date: Mon, 23 Jan 2023 00:45:48 +0000 Subject: [PATCH 1/2] Add rebar_parallel pool, use in DAG scans This addresses https://github.com/erlang/rebar3/issues/2767 by creating a pool mechanism in rebar_parallel that keeps as similar of an interface as possible as the queue mechanism, with the one caveat that it allows the asynchronous creation of tasks rather than requiring them all at start time. The mechanism is not tested super deeply, which is probably a mistake, but I wanted to get a reviewable PR first. The mechanism is also added to the rebar_compiler_dag module to cover use cases that were handled by spawning an unbounded number of processes before, which would cause problem with low file descriptors being allocated and lots of files being used and open in parallel. The pool mechanism puts an upper bound on processing but also on resource usage. So this PR may also come with a performance regression, and if so we'd want to override the default 1-per-scheduler pool options to use a lot more and hit a middleground in performance vs. resource usage. --- apps/rebar/src/rebar_compiler_dag.erl | 96 +++++++++++------------- apps/rebar/src/rebar_parallel.erl | 84 ++++++++++++++++++++- apps/rebar/test/rebar_parallel_SUITE.erl | 80 ++++++++++++++++++++ 3 files changed, 207 insertions(+), 53 deletions(-) create mode 100644 apps/rebar/test/rebar_parallel_SUITE.erl diff --git a/apps/rebar/src/rebar_compiler_dag.erl b/apps/rebar/src/rebar_compiler_dag.erl index 5daa7a973..43f902a9f 100644 --- a/apps/rebar/src/rebar_compiler_dag.erl +++ b/apps/rebar/src/rebar_compiler_dag.erl @@ -132,46 +132,41 @@ filter_prefix(G, [{App, Out} | AppTail] = AppPaths, [File | FTail]) -> filter_prefix(G, AppPaths, FTail) end. -finalise_populate_sources(_G, _InDirs, Waiting) when Waiting =:= #{} -> +finalise_populate_sources(G, InDirs, Pool) -> + Res = rebar_parallel:pool_terminate(Pool), + finalise_populate_sources_(G, InDirs, Res). + +finalise_populate_sources_(_G, _InDirs, []) -> ok; -finalise_populate_sources(G, InDirs, Waiting) -> - %% wait for all deps to complete - receive - {deps, Pid, AbsIncls} -> - {Status, Source} = maps:get(Pid, Waiting), - %% the file hasn't been visited yet; set it to existing, but with - %% a last modified value that's null so it gets updated to something new. - [digraph:add_vertex(G, Src, 0) || Src <- AbsIncls, - digraph:vertex(G, Src) =:= false], - %% drop edges from deps that aren't included! - [digraph:del_edge(G, Edge) || Status == old, - Edge <- digraph:out_edges(G, Source), - {_, _Src, Path, _Label} <- [digraph:edge(G, Edge)], - not lists:member(Path, AbsIncls)], - %% Add the rest - [digraph:add_edge(G, Source, Incl) || Incl <- AbsIncls], - %% mark the digraph dirty when there is any change in - %% dependencies, for any application in the project - mark_dirty(G), - finalise_populate_sources(G, InDirs, Waiting); - {'DOWN', _MRef, process, Pid, normal} -> - finalise_populate_sources(G, InDirs, maps:remove(Pid, Waiting)); - {'DOWN', _MRef, process, Pid, Reason} -> - {_Status, Source} = maps:get(Pid, Waiting), - ?ERROR("Failed to get dependencies for ~s~n~p", [Source, Reason]), - ?ABORT - end. +finalise_populate_sources_(G, InDirs, [{Status, {deps, Source, AbsIncls}}|Acc]) -> + %% the file hasn't been visited yet; set it to existing, but with + %% a last modified value that's null so it gets updated to something new. + [digraph:add_vertex(G, Src, 0) || Src <- AbsIncls, + digraph:vertex(G, Src) =:= false], + %% drop edges from deps that aren't included! + [digraph:del_edge(G, Edge) || Status == old, + Edge <- digraph:out_edges(G, Source), + {_, _Src, Path, _Label} <- [digraph:edge(G, Edge)], + not lists:member(Path, AbsIncls)], + %% Add the rest + [digraph:add_edge(G, Source, Incl) || Incl <- AbsIncls], + %% mark the digraph dirty when there is any change in + %% dependencies, for any application in the project + mark_dirty(G), + finalise_populate_sources_(G, InDirs, Acc). %% @doc this function scans all the source files found and looks into %% all the `InDirs' for deps (other source files, or files that aren't source %% but still returned by the compiler module) that are related %% to them. populate_sources(G, Compiler, InDirs, Sources, DepOpts) -> - populate_sources(G, Compiler, InDirs, Sources, DepOpts, #{}). + Pool = rebar_parallel:pool(fun erlang:apply/2, [], + fun(Res, _) -> {ok, Res} end, []), + populate_sources(G, Compiler, InDirs, Sources, DepOpts, Pool). -populate_sources(G, _Compiler, InDirs, [], _DepOpts, Waiting) -> - finalise_populate_sources(G, InDirs, Waiting); -populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Waiting) -> +populate_sources(G, _Compiler, InDirs, [], _DepOpts, Pool) -> + finalise_populate_sources(G, InDirs, Pool); +populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Pool) -> case digraph:vertex(G, Source) of {_, LastUpdated} -> case filelib:last_modified(Source) of @@ -180,19 +175,21 @@ populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Waiting) -> %% from the graph. digraph:del_vertex(G, Source), mark_dirty(G), - populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting); + populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool); LastModified when LastUpdated < LastModified -> digraph:add_vertex(G, Source, LastModified), - Worker = prepopulate_deps(Compiler, InDirs, Source, DepOpts, self()), - populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting#{Worker => {old, Source}}); + Work = fun() -> {old, prepopulate_deps(Compiler, InDirs, Source, DepOpts)} end, + rebar_parallel:pool_task(Pool, Work), + populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool); _ -> % unchanged - populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting) + populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool) end; false -> LastModified = filelib:last_modified(Source), digraph:add_vertex(G, Source, LastModified), - Worker = prepopulate_deps(Compiler, InDirs, Source, DepOpts, self()), - populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting#{Worker => {new, Source}}) + Work = fun() -> {new, prepopulate_deps(Compiler, InDirs, Source, DepOpts)} end, + rebar_parallel:pool_task(Pool, Work), + populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool) end. %% @doc Scan all files in the digraph that are seen as dependencies, but are @@ -450,20 +447,15 @@ maybe_rm_vertex(G, Source) -> %% mark its timestamp to 0, which means we have no info on it. %% Source files will be covered at a later point in their own scan, and %% non-source files are going to be covered by `populate_deps/3'. -prepopulate_deps(Compiler, InDirs, Source, DepOpts, Control) -> - {Worker, _MRef} = spawn_monitor( - fun () -> - SourceDir = filename:dirname(Source), - AbsIncls = case erlang:function_exported(Compiler, dependencies, 4) of - false -> - Compiler:dependencies(Source, SourceDir, InDirs); - true -> - Compiler:dependencies(Source, SourceDir, InDirs, DepOpts) - end, - Control ! {deps, self(), AbsIncls} - end - ), - Worker. +prepopulate_deps(Compiler, InDirs, Source, DepOpts) -> + SourceDir = filename:dirname(Source), + AbsIncls = case erlang:function_exported(Compiler, dependencies, 4) of + false -> + Compiler:dependencies(Source, SourceDir, InDirs); + true -> + Compiler:dependencies(Source, SourceDir, InDirs, DepOpts) + end, + {deps, Source, AbsIncls}. %% check that a dep file is up to date refresh_dep(_G, {artifact, _}) -> diff --git a/apps/rebar/src/rebar_parallel.erl b/apps/rebar/src/rebar_parallel.erl index e1b621614..6118652c9 100644 --- a/apps/rebar/src/rebar_parallel.erl +++ b/apps/rebar/src/rebar_parallel.erl @@ -7,7 +7,8 @@ %%% and extracted here to be reused in other circumstances. %%% @end -module(rebar_parallel). --export([queue/5]). +-export([queue/5, + pool/4, pool/5, pool_task/2, pool_terminate/1]). -include("rebar.hrl"). queue(Tasks, WorkF, WArgs, Handler, HArgs) -> @@ -18,6 +19,36 @@ queue(Tasks, WorkF, WArgs, Handler, HArgs) -> Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)], parallel_dispatch(Tasks, Pids, Handler, HArgs). +pool(WorkF, WArgs, Handler, HArgs) -> + pool(WorkF, WArgs, Handler, HArgs, erlang:system_info(schedulers)). + +pool(WorkF, WArgs, Handler, HArgs, Jobs) -> + Parent = self(), + Coordinator = spawn_link(fun() -> + Coord = self(), + Worker = fun() -> worker(Coord, WorkF, WArgs) end, + Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)], + Parent ! pool_ready, + pool_coordinator([], [], Pids, Handler, HArgs, [], undefined) + end), + receive pool_ready -> ok end, + Coordinator. + +pool_task(Pid, Task) -> + Pid ! {task, Task}, + ok. + +pool_terminate(Pid) -> + Ref = erlang:monitor(process, Pid), + Pid ! {self(), terminate}, + receive + {Pid, Res} -> + erlang:demonitor(Ref, [flush]), + Res; + {'DOWN', Ref, process, Pid, Info} -> + error(Info) + end. + parallel_dispatch([], [], _, _) -> []; parallel_dispatch(Targets, Pids, Handler, Args) -> @@ -54,3 +85,54 @@ worker(QueuePid, F, Args) -> ok end. +pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report) -> + receive + {task, Task} when Report =/= undefined -> + ?ERROR("Task added to pool after being terminated: ~p", [Task]), + ?ABORT; + {task, Task} when FreePids =:= [] -> + pool_coordinator([Task|Tasks], FreePids, Pids, Handler, HArgs, Acc, Report); + {task, Task} -> + [Pid|NewFree] = FreePids, + Pid ! {task, Task}, + pool_coordinator(Tasks, NewFree, Pids, Handler, HArgs, Acc, Report); + {ready, _Pid} when Tasks =:= [], Report =/= undefined -> + %% And we're done + %% terminate workers async, return results if done + [Pid ! empty || {Pid,_Ref} <- Pids], + pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report); + {ready, Pid} when Tasks =:= [] -> + pool_coordinator(Tasks, [Pid|FreePids], Pids, Handler, HArgs, Acc, Report); + {ready, Pid} -> + [Task|NewTasks] = Tasks, + Pid ! {task, Task}, + pool_coordinator(NewTasks, FreePids, Pids, Handler, HArgs, Acc, Report); + {'DOWN', Mref, _, Pid, normal} -> + NewPids = lists:delete({Pid, Mref}, Pids), + NewFree = lists:delete(Pid, FreePids), + case NewPids of + [] when is_pid(Report) -> + Report ! {self(), Acc}; + _ -> + pool_coordinator(Tasks, NewFree, NewPids, Handler, HArgs, Acc, Report) + end; + {'DOWN', _Mref, _, _Pid, Info} -> + ?ERROR("Task failed: ~p", [Info]), + ?ABORT; + {result, Result} -> + case Handler(Result, HArgs) of + ok -> + pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report); + {ok, Res} -> + pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, [Res|Acc], Report) + end; + {Caller, terminate} -> + if Pids =:= []; % no workers somehow + length(Pids) =:= length(FreePids), Tasks =:= [] -> % All Idle + Caller ! {self(), Acc}; + true -> + %% Still work to do + pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Caller) + end + end. + diff --git a/apps/rebar/test/rebar_parallel_SUITE.erl b/apps/rebar/test/rebar_parallel_SUITE.erl new file mode 100644 index 000000000..6cbdaaa40 --- /dev/null +++ b/apps/rebar/test/rebar_parallel_SUITE.erl @@ -0,0 +1,80 @@ +-module(rebar_parallel_SUITE). +-compile(export_all). +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> [empty_set, same_results, pool_fetcher, pool_misuse]. + +empty_set() -> + [{doc, "Running with null tasks still works"}]. +empty_set(_) -> + ?assertEqual([], + rebar_parallel:queue([], + fun(X,_) -> X end, [arg], + fun(_,_) -> ok end, [arg])), + ?assertEqual([], + rebar_parallel:queue([], + fun(X,_) -> X end, [arg], + fun(X,_) -> {ok, X} end, [arg])), + P1 = rebar_parallel:pool(fun(X,_) -> X end, [arg], + fun(_,_) -> ok end, [arg]), + ?assertEqual([], + rebar_parallel:pool_terminate(P1)), + P2 = rebar_parallel:pool(fun(X,_) -> X end, [arg], + fun(X,_) -> {ok,X} end, [arg]), + ?assertEqual([], + rebar_parallel:pool_terminate(P2)), + ok. + +same_results() -> + [{doc, "The two parallel methods can be equivalent but the pool can " + "be used to do asynchronous task creation"}]. +same_results(_) -> + ?assertEqual([2,4,6,8,10,12,14], + lists:sort( + rebar_parallel:queue([1,2,3,4,5,6,7], + fun(X,_) -> X*2 end, [], + fun(X,_) -> {ok, X} end, []))), + P = rebar_parallel:pool(fun(X,_) -> X*2 end, [], + fun(X,_) -> {ok, X} end, []), + _ = [rebar_parallel:pool_task(P, N) || N <- [1,2,3,4,5,6,7]], + ?assertEqual([2,4,6,8,10,12,14], + lists:sort(rebar_parallel:pool_terminate(P))), + ok. + +pool_fetcher() -> + [{doc, "The fetcher from a pool can be from a different process " + "and the other one will get an error."}]. +pool_fetcher(_) -> + Parent = self(), + P = rebar_parallel:pool(fun(X,_) -> X*2 end, [], + fun(X,_) -> {ok, X} end, []), + _ = [rebar_parallel:pool_task(P, N) || N <- [1,2,3,4,5,6,7]], + spawn_link(fun() -> Parent ! {res, lists:sort(rebar_parallel:pool_terminate(P))} end), + receive + {res, X} -> ?assertEqual([2,4,6,8,10,12,14], X) + after 500 -> error(timeout) + end, + ok. + +pool_misuse() -> + [{doc, "Using the pool for tasks after it is terminated but before " + "it returns, you get a crash even if it's async"}]. +pool_misuse(_) -> + Parent = self(), + P = rebar_parallel:pool(fun(_,_) -> timer:sleep(1000) end, [], + fun(X,_) -> {ok, X} end, []), + _ = [rebar_parallel:pool_task(P, N) || N <- [1,2,3,4,5,6,7]], + spawn(fun() -> Parent ! ok, rebar_parallel:pool_terminate(P) end), + receive ok -> timer:sleep(100) end, + Old = process_flag(trap_exit, true), + rebar_parallel:pool_task(P, 0), + receive + {'EXIT', P, {{nocatch, rebar_abort}, _Stack}} -> + process_flag(trap_exit, Old) + after 1000 -> + process_flag(trap_exit, Old), + error(no_abort) + end, + ok. + From 4583c25b845ebcd82ed793cf6198dfdd49cee8cb Mon Sep 17 00:00:00 2001 From: Fred Hebert Date: Thu, 26 Jan 2023 14:55:55 +0000 Subject: [PATCH 2/2] Add types and docs to rebar_parallel --- apps/rebar/src/rebar_compiler_dag.erl | 4 +- apps/rebar/src/rebar_parallel.erl | 164 ++++++++++++++++++++--- apps/rebar/test/rebar_parallel_SUITE.erl | 8 +- 3 files changed, 151 insertions(+), 25 deletions(-) diff --git a/apps/rebar/src/rebar_compiler_dag.erl b/apps/rebar/src/rebar_compiler_dag.erl index 43f902a9f..9298ca053 100644 --- a/apps/rebar/src/rebar_compiler_dag.erl +++ b/apps/rebar/src/rebar_compiler_dag.erl @@ -179,7 +179,7 @@ populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Pool) -> LastModified when LastUpdated < LastModified -> digraph:add_vertex(G, Source, LastModified), Work = fun() -> {old, prepopulate_deps(Compiler, InDirs, Source, DepOpts)} end, - rebar_parallel:pool_task(Pool, Work), + rebar_parallel:pool_task_async(Pool, Work), populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool); _ -> % unchanged populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool) @@ -188,7 +188,7 @@ populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Pool) -> LastModified = filelib:last_modified(Source), digraph:add_vertex(G, Source, LastModified), Work = fun() -> {new, prepopulate_deps(Compiler, InDirs, Source, DepOpts)} end, - rebar_parallel:pool_task(Pool, Work), + rebar_parallel:pool_task_async(Pool, Work), populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool) end. diff --git a/apps/rebar/src/rebar_parallel.erl b/apps/rebar/src/rebar_parallel.erl index 6118652c9..eb4992d0a 100644 --- a/apps/rebar/src/rebar_parallel.erl +++ b/apps/rebar/src/rebar_parallel.erl @@ -5,12 +5,38 @@ %%% %%% Original design by Max Fedorov in the rebar compiler, then generalised %%% and extracted here to be reused in other circumstances. +%%% +%%% It also contains an asynchronous version of the queue built with a +%%% naive pool, which allows similar semantics in worker definitions, but +%%% without demanding to know all the tasks to run ahead of time. %%% @end -module(rebar_parallel). -export([queue/5, - pool/4, pool/5, pool_task/2, pool_terminate/1]). + pool/4, pool/5, pool_task_async/2, pool_terminate/1]). -include("rebar.hrl"). +%% @doc Create a queue using as many workers as there are schedulers, +%% that will spread all `Task' entries among them based on whichever +%% is available first. +%% +%% The values returned by the worker function `WorkF' for each value +%% is then passed to a `Handler' which either discards its result +%% after having done a side-effectful operation (by returning `ok') +%% as in a `lists:foreach/2' call, or returns a value that gets +%% added to an accumulator (by returning `{ok, Val}'). The handler +%% can return both types as required. +%% +%% The accumulated list of value is in no specific order and depends +%% on how tasks were scheduled and completed. +-spec queue([Task], WorkF, WArgs, Handler, HArgs) -> [Ret] when + Task :: term(), + WorkF :: fun((Task, WArgs) -> TmpRet), + WArgs :: term(), + Handler :: fun((TmpRet, HArgs) -> NoRet | AccVal), + HArgs :: term(), + NoRet :: ok, + AccVal :: {ok, Ret}, + Ret :: term(). queue(Tasks, WorkF, WArgs, Handler, HArgs) -> Parent = self(), Worker = fun() -> worker(Parent, WorkF, WArgs) end, @@ -19,25 +45,90 @@ queue(Tasks, WorkF, WArgs, Handler, HArgs) -> Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)], parallel_dispatch(Tasks, Pids, Handler, HArgs). +%% @doc Create a pool using as many workers as there are schedulers, +%% and for which tasks can be added by calling `pool_async_task/2'. +%% +%% The values returned by the worker function `WorkF' for each value +%% is then passed to a `Handler' which either discards its result +%% after having done a side-effectful operation (by returning `ok') +%% as in a `lists:foreach/2' call, or returns a value that gets +%% added to an accumulator (by returning `{ok, Val}'). The handler +%% can return both types as required. +%% +%% The accumulated list of value is in no specific order and depends +%% on how tasks were scheduled and completed, and can only +%% be obtained by calling `pool_terminate/1'. +%% +%% The pool process is linked to its initial caller and will error +%% out via a link if any task crashes or other invalid states are found +-spec pool(WorkF, WArgs, Handler, HArgs) -> pid() when + WorkF :: fun((Task, WArgs) -> TmpRet), + Task :: term(), + WArgs :: term(), + Handler :: fun((TmpRet, HArgs) -> NoRet | AccVal), + HArgs :: term(), + NoRet :: ok, + AccVal :: {ok, term()}. pool(WorkF, WArgs, Handler, HArgs) -> pool(WorkF, WArgs, Handler, HArgs, erlang:system_info(schedulers)). -pool(WorkF, WArgs, Handler, HArgs, Jobs) -> +%% @doc Create a pool using `PoolSize' workers and for which tasks can be +%% added by calling `pool_async_task/2'. +%% +%% The values returned by the worker function `WorkF' for each value +%% is then passed to a `Handler' which either discards its result +%% after having done a side-effectful operation (by returning `ok') +%% as in a `lists:foreach/2' call, or returns a value that gets +%% added to an accumulator (by returning `{ok, Val}'). The handler +%% can return both types as required. +%% +%% The accumulated list of value is in no specific order and depends +%% on how tasks were scheduled and completed, and can only +%% be obtained by calling `pool_terminate/1'. +%% +%% The pool process is linked to its initial caller and will error +%% out via a link if any task crashes or other invalid states are found +-spec pool(WorkF, WArgs, Handler, HArgs, PoolSize) -> pid() when + WorkF :: fun((Task, WArgs) -> TmpRet), + Task :: term(), + WArgs :: term(), + Handler :: fun((TmpRet, HArgs) -> NoRet | AccVal), + HArgs :: term(), + PoolSize :: pos_integer(), + NoRet :: ok, + AccVal :: {ok, term()}. +pool(WorkF, WArgs, Handler, HArgs, PoolSize) when PoolSize > 0 -> Parent = self(), Coordinator = spawn_link(fun() -> Coord = self(), Worker = fun() -> worker(Coord, WorkF, WArgs) end, - Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)], + Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, PoolSize)], Parent ! pool_ready, pool_coordinator([], [], Pids, Handler, HArgs, [], undefined) end), receive pool_ready -> ok end, Coordinator. -pool_task(Pid, Task) -> +%% @doc Add a task to a pool. +%% This call is asynchronous and does no validation about whether the pool +%% process exists or not. If the pool has already been terminated or is +%% in the process of being terminated, the task may trigger the pool to +%% abort and error out to point out invalid usage. +-spec pool_task_async(pid(), term()) -> ok. +pool_task_async(Pid, Task) -> Pid ! {task, Task}, ok. +%% @doc Mark the pool as terminated. At this point it will stop +%% accepting new tasks but will keep processing those that have been +%% scheduled. +%% +%% Once all tasks are complete and workers have shut down, the +%% accumulated value will be returned. +%% +%% Any process may terminate the pool, and the pool may only be +%% terminated once. +-spec pool_terminate(pid()) -> [term()]. pool_terminate(Pid) -> Ref = erlang:monitor(process, Pid), Pid ! {self(), terminate}, @@ -49,6 +140,15 @@ pool_terminate(Pid) -> error(Info) end. +%%%%%%%%%%%%%%%% +%%% INTERNAL %%% +%%%%%%%%%%%%%%%% + +%%% Queue implementation %%% +%% @private the queue is rather straightforward. `Targets' represents the tasks +%% yet to be run, which are sent to workers in `Pids' as they mark themselves +%% as free. When workers are empty but no tasks are left, they are shutdown. +%% Once the shutdown is complete, the result is returned. parallel_dispatch([], [], _, _) -> []; parallel_dispatch(Targets, Pids, Handler, Args) -> @@ -75,45 +175,55 @@ parallel_dispatch(Targets, Pids, Handler, Args) -> end end. -worker(QueuePid, F, Args) -> - QueuePid ! {ready, self()}, - receive - {task, Task} -> - QueuePid ! {result, F(Task, Args)}, - worker(QueuePid, F, Args); - empty -> - ok - end. - +%%% Pool implementation %%% +%% @private The pool supports asynchronous tasks addition, which makes it +%% significantly hairier than the task queue. It uses `Tasks' to track +%% enqueued tasks, `FreePids' to track the workers that currently do not +%% have work to do, `Pids' to track all workers (and know what remains to +%% be shut down at the end), an accumulator (`Acc') for results that must +%% be returned on-demand, and a `Report' value that is either `undefined' +%% or marks the Pid of the process calling for a report. +%% +%% Tasks and free processes can grow individually at different times, and +%% we only demand that workers start shutting down once a `Report' entry +%% has been defined, which returns once tasks that were in flight have all +%% terminated. pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report) -> receive {task, Task} when Report =/= undefined -> ?ERROR("Task added to pool after being terminated: ~p", [Task]), ?ABORT; {task, Task} when FreePids =:= [] -> + %% no worker is free, enqueue. pool_coordinator([Task|Tasks], FreePids, Pids, Handler, HArgs, Acc, Report); {task, Task} -> + %% workers are free, assign right away [Pid|NewFree] = FreePids, Pid ! {task, Task}, pool_coordinator(Tasks, NewFree, Pids, Handler, HArgs, Acc, Report); {ready, _Pid} when Tasks =:= [], Report =/= undefined -> - %% And we're done - %% terminate workers async, return results if done + %% And we're done! + %% terminate workers async, and wait for their shutdown [Pid ! empty || {Pid,_Ref} <- Pids], pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report); {ready, Pid} when Tasks =:= [] -> + %% worker free, no tasks to run, put in the free list pool_coordinator(Tasks, [Pid|FreePids], Pids, Handler, HArgs, Acc, Report); {ready, Pid} -> + %% worker free, tasks are available, assign right away [Task|NewTasks] = Tasks, Pid ! {task, Task}, pool_coordinator(NewTasks, FreePids, Pids, Handler, HArgs, Acc, Report); {'DOWN', Mref, _, Pid, normal} -> + %% worker terminated as expected NewPids = lists:delete({Pid, Mref}, Pids), NewFree = lists:delete(Pid, FreePids), case NewPids of [] when is_pid(Report) -> + %% shutdown complete! Report ! {self(), Acc}; _ -> + %% still shutting down pool_coordinator(Tasks, NewFree, NewPids, Handler, HArgs, Acc, Report) end; {'DOWN', _Mref, _, _Pid, Info} -> @@ -126,13 +236,29 @@ pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report) -> {ok, Res} -> pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, [Res|Acc], Report) end; - {Caller, terminate} -> + {Caller, terminate} when Report =:= undefined -> + %% We're being asked to return results! if Pids =:= []; % no workers somehow - length(Pids) =:= length(FreePids), Tasks =:= [] -> % All Idle + length(Pids) =:= length(FreePids), Tasks =:= [] -> % All Idle, no work to do Caller ! {self(), Acc}; true -> %% Still work to do pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Caller) - end + end; + {Caller, terminate} when is_pid(Report) -> + ?ERROR("Another process (~p) already terminates the pool, demand from ~p is invalid", + [Report, Caller]), + ?ABORT end. +%%% Shared components %%% + +worker(QueuePid, F, Args) -> + QueuePid ! {ready, self()}, + receive + {task, Task} -> + QueuePid ! {result, F(Task, Args)}, + worker(QueuePid, F, Args); + empty -> + ok + end. diff --git a/apps/rebar/test/rebar_parallel_SUITE.erl b/apps/rebar/test/rebar_parallel_SUITE.erl index 6cbdaaa40..ad0224870 100644 --- a/apps/rebar/test/rebar_parallel_SUITE.erl +++ b/apps/rebar/test/rebar_parallel_SUITE.erl @@ -37,7 +37,7 @@ same_results(_) -> fun(X,_) -> {ok, X} end, []))), P = rebar_parallel:pool(fun(X,_) -> X*2 end, [], fun(X,_) -> {ok, X} end, []), - _ = [rebar_parallel:pool_task(P, N) || N <- [1,2,3,4,5,6,7]], + _ = [rebar_parallel:pool_task_async(P, N) || N <- [1,2,3,4,5,6,7]], ?assertEqual([2,4,6,8,10,12,14], lists:sort(rebar_parallel:pool_terminate(P))), ok. @@ -49,7 +49,7 @@ pool_fetcher(_) -> Parent = self(), P = rebar_parallel:pool(fun(X,_) -> X*2 end, [], fun(X,_) -> {ok, X} end, []), - _ = [rebar_parallel:pool_task(P, N) || N <- [1,2,3,4,5,6,7]], + _ = [rebar_parallel:pool_task_async(P, N) || N <- [1,2,3,4,5,6,7]], spawn_link(fun() -> Parent ! {res, lists:sort(rebar_parallel:pool_terminate(P))} end), receive {res, X} -> ?assertEqual([2,4,6,8,10,12,14], X) @@ -64,11 +64,11 @@ pool_misuse(_) -> Parent = self(), P = rebar_parallel:pool(fun(_,_) -> timer:sleep(1000) end, [], fun(X,_) -> {ok, X} end, []), - _ = [rebar_parallel:pool_task(P, N) || N <- [1,2,3,4,5,6,7]], + _ = [rebar_parallel:pool_task_async(P, N) || N <- [1,2,3,4,5,6,7]], spawn(fun() -> Parent ! ok, rebar_parallel:pool_terminate(P) end), receive ok -> timer:sleep(100) end, Old = process_flag(trap_exit, true), - rebar_parallel:pool_task(P, 0), + rebar_parallel:pool_task_async(P, 0), receive {'EXIT', P, {{nocatch, rebar_abort}, _Stack}} -> process_flag(trap_exit, Old)