diff --git a/apps/rebar/src/rebar_compiler_dag.erl b/apps/rebar/src/rebar_compiler_dag.erl index 5daa7a973..9298ca053 100644 --- a/apps/rebar/src/rebar_compiler_dag.erl +++ b/apps/rebar/src/rebar_compiler_dag.erl @@ -132,46 +132,41 @@ filter_prefix(G, [{App, Out} | AppTail] = AppPaths, [File | FTail]) -> filter_prefix(G, AppPaths, FTail) end. -finalise_populate_sources(_G, _InDirs, Waiting) when Waiting =:= #{} -> +finalise_populate_sources(G, InDirs, Pool) -> + Res = rebar_parallel:pool_terminate(Pool), + finalise_populate_sources_(G, InDirs, Res). + +finalise_populate_sources_(_G, _InDirs, []) -> ok; -finalise_populate_sources(G, InDirs, Waiting) -> - %% wait for all deps to complete - receive - {deps, Pid, AbsIncls} -> - {Status, Source} = maps:get(Pid, Waiting), - %% the file hasn't been visited yet; set it to existing, but with - %% a last modified value that's null so it gets updated to something new. - [digraph:add_vertex(G, Src, 0) || Src <- AbsIncls, - digraph:vertex(G, Src) =:= false], - %% drop edges from deps that aren't included! - [digraph:del_edge(G, Edge) || Status == old, - Edge <- digraph:out_edges(G, Source), - {_, _Src, Path, _Label} <- [digraph:edge(G, Edge)], - not lists:member(Path, AbsIncls)], - %% Add the rest - [digraph:add_edge(G, Source, Incl) || Incl <- AbsIncls], - %% mark the digraph dirty when there is any change in - %% dependencies, for any application in the project - mark_dirty(G), - finalise_populate_sources(G, InDirs, Waiting); - {'DOWN', _MRef, process, Pid, normal} -> - finalise_populate_sources(G, InDirs, maps:remove(Pid, Waiting)); - {'DOWN', _MRef, process, Pid, Reason} -> - {_Status, Source} = maps:get(Pid, Waiting), - ?ERROR("Failed to get dependencies for ~s~n~p", [Source, Reason]), - ?ABORT - end. +finalise_populate_sources_(G, InDirs, [{Status, {deps, Source, AbsIncls}}|Acc]) -> + %% the file hasn't been visited yet; set it to existing, but with + %% a last modified value that's null so it gets updated to something new. + [digraph:add_vertex(G, Src, 0) || Src <- AbsIncls, + digraph:vertex(G, Src) =:= false], + %% drop edges from deps that aren't included! + [digraph:del_edge(G, Edge) || Status == old, + Edge <- digraph:out_edges(G, Source), + {_, _Src, Path, _Label} <- [digraph:edge(G, Edge)], + not lists:member(Path, AbsIncls)], + %% Add the rest + [digraph:add_edge(G, Source, Incl) || Incl <- AbsIncls], + %% mark the digraph dirty when there is any change in + %% dependencies, for any application in the project + mark_dirty(G), + finalise_populate_sources_(G, InDirs, Acc). %% @doc this function scans all the source files found and looks into %% all the `InDirs' for deps (other source files, or files that aren't source %% but still returned by the compiler module) that are related %% to them. populate_sources(G, Compiler, InDirs, Sources, DepOpts) -> - populate_sources(G, Compiler, InDirs, Sources, DepOpts, #{}). + Pool = rebar_parallel:pool(fun erlang:apply/2, [], + fun(Res, _) -> {ok, Res} end, []), + populate_sources(G, Compiler, InDirs, Sources, DepOpts, Pool). -populate_sources(G, _Compiler, InDirs, [], _DepOpts, Waiting) -> - finalise_populate_sources(G, InDirs, Waiting); -populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Waiting) -> +populate_sources(G, _Compiler, InDirs, [], _DepOpts, Pool) -> + finalise_populate_sources(G, InDirs, Pool); +populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Pool) -> case digraph:vertex(G, Source) of {_, LastUpdated} -> case filelib:last_modified(Source) of @@ -180,19 +175,21 @@ populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Waiting) -> %% from the graph. digraph:del_vertex(G, Source), mark_dirty(G), - populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting); + populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool); LastModified when LastUpdated < LastModified -> digraph:add_vertex(G, Source, LastModified), - Worker = prepopulate_deps(Compiler, InDirs, Source, DepOpts, self()), - populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting#{Worker => {old, Source}}); + Work = fun() -> {old, prepopulate_deps(Compiler, InDirs, Source, DepOpts)} end, + rebar_parallel:pool_task_async(Pool, Work), + populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool); _ -> % unchanged - populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting) + populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool) end; false -> LastModified = filelib:last_modified(Source), digraph:add_vertex(G, Source, LastModified), - Worker = prepopulate_deps(Compiler, InDirs, Source, DepOpts, self()), - populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting#{Worker => {new, Source}}) + Work = fun() -> {new, prepopulate_deps(Compiler, InDirs, Source, DepOpts)} end, + rebar_parallel:pool_task_async(Pool, Work), + populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool) end. %% @doc Scan all files in the digraph that are seen as dependencies, but are @@ -450,20 +447,15 @@ maybe_rm_vertex(G, Source) -> %% mark its timestamp to 0, which means we have no info on it. %% Source files will be covered at a later point in their own scan, and %% non-source files are going to be covered by `populate_deps/3'. -prepopulate_deps(Compiler, InDirs, Source, DepOpts, Control) -> - {Worker, _MRef} = spawn_monitor( - fun () -> - SourceDir = filename:dirname(Source), - AbsIncls = case erlang:function_exported(Compiler, dependencies, 4) of - false -> - Compiler:dependencies(Source, SourceDir, InDirs); - true -> - Compiler:dependencies(Source, SourceDir, InDirs, DepOpts) - end, - Control ! {deps, self(), AbsIncls} - end - ), - Worker. +prepopulate_deps(Compiler, InDirs, Source, DepOpts) -> + SourceDir = filename:dirname(Source), + AbsIncls = case erlang:function_exported(Compiler, dependencies, 4) of + false -> + Compiler:dependencies(Source, SourceDir, InDirs); + true -> + Compiler:dependencies(Source, SourceDir, InDirs, DepOpts) + end, + {deps, Source, AbsIncls}. %% check that a dep file is up to date refresh_dep(_G, {artifact, _}) -> diff --git a/apps/rebar/src/rebar_parallel.erl b/apps/rebar/src/rebar_parallel.erl index e1b621614..eb4992d0a 100644 --- a/apps/rebar/src/rebar_parallel.erl +++ b/apps/rebar/src/rebar_parallel.erl @@ -5,11 +5,38 @@ %%% %%% Original design by Max Fedorov in the rebar compiler, then generalised %%% and extracted here to be reused in other circumstances. +%%% +%%% It also contains an asynchronous version of the queue built with a +%%% naive pool, which allows similar semantics in worker definitions, but +%%% without demanding to know all the tasks to run ahead of time. %%% @end -module(rebar_parallel). --export([queue/5]). +-export([queue/5, + pool/4, pool/5, pool_task_async/2, pool_terminate/1]). -include("rebar.hrl"). +%% @doc Create a queue using as many workers as there are schedulers, +%% that will spread all `Task' entries among them based on whichever +%% is available first. +%% +%% The values returned by the worker function `WorkF' for each value +%% is then passed to a `Handler' which either discards its result +%% after having done a side-effectful operation (by returning `ok') +%% as in a `lists:foreach/2' call, or returns a value that gets +%% added to an accumulator (by returning `{ok, Val}'). The handler +%% can return both types as required. +%% +%% The accumulated list of value is in no specific order and depends +%% on how tasks were scheduled and completed. +-spec queue([Task], WorkF, WArgs, Handler, HArgs) -> [Ret] when + Task :: term(), + WorkF :: fun((Task, WArgs) -> TmpRet), + WArgs :: term(), + Handler :: fun((TmpRet, HArgs) -> NoRet | AccVal), + HArgs :: term(), + NoRet :: ok, + AccVal :: {ok, Ret}, + Ret :: term(). queue(Tasks, WorkF, WArgs, Handler, HArgs) -> Parent = self(), Worker = fun() -> worker(Parent, WorkF, WArgs) end, @@ -18,6 +45,110 @@ queue(Tasks, WorkF, WArgs, Handler, HArgs) -> Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)], parallel_dispatch(Tasks, Pids, Handler, HArgs). +%% @doc Create a pool using as many workers as there are schedulers, +%% and for which tasks can be added by calling `pool_async_task/2'. +%% +%% The values returned by the worker function `WorkF' for each value +%% is then passed to a `Handler' which either discards its result +%% after having done a side-effectful operation (by returning `ok') +%% as in a `lists:foreach/2' call, or returns a value that gets +%% added to an accumulator (by returning `{ok, Val}'). The handler +%% can return both types as required. +%% +%% The accumulated list of value is in no specific order and depends +%% on how tasks were scheduled and completed, and can only +%% be obtained by calling `pool_terminate/1'. +%% +%% The pool process is linked to its initial caller and will error +%% out via a link if any task crashes or other invalid states are found +-spec pool(WorkF, WArgs, Handler, HArgs) -> pid() when + WorkF :: fun((Task, WArgs) -> TmpRet), + Task :: term(), + WArgs :: term(), + Handler :: fun((TmpRet, HArgs) -> NoRet | AccVal), + HArgs :: term(), + NoRet :: ok, + AccVal :: {ok, term()}. +pool(WorkF, WArgs, Handler, HArgs) -> + pool(WorkF, WArgs, Handler, HArgs, erlang:system_info(schedulers)). + +%% @doc Create a pool using `PoolSize' workers and for which tasks can be +%% added by calling `pool_async_task/2'. +%% +%% The values returned by the worker function `WorkF' for each value +%% is then passed to a `Handler' which either discards its result +%% after having done a side-effectful operation (by returning `ok') +%% as in a `lists:foreach/2' call, or returns a value that gets +%% added to an accumulator (by returning `{ok, Val}'). The handler +%% can return both types as required. +%% +%% The accumulated list of value is in no specific order and depends +%% on how tasks were scheduled and completed, and can only +%% be obtained by calling `pool_terminate/1'. +%% +%% The pool process is linked to its initial caller and will error +%% out via a link if any task crashes or other invalid states are found +-spec pool(WorkF, WArgs, Handler, HArgs, PoolSize) -> pid() when + WorkF :: fun((Task, WArgs) -> TmpRet), + Task :: term(), + WArgs :: term(), + Handler :: fun((TmpRet, HArgs) -> NoRet | AccVal), + HArgs :: term(), + PoolSize :: pos_integer(), + NoRet :: ok, + AccVal :: {ok, term()}. +pool(WorkF, WArgs, Handler, HArgs, PoolSize) when PoolSize > 0 -> + Parent = self(), + Coordinator = spawn_link(fun() -> + Coord = self(), + Worker = fun() -> worker(Coord, WorkF, WArgs) end, + Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, PoolSize)], + Parent ! pool_ready, + pool_coordinator([], [], Pids, Handler, HArgs, [], undefined) + end), + receive pool_ready -> ok end, + Coordinator. + +%% @doc Add a task to a pool. +%% This call is asynchronous and does no validation about whether the pool +%% process exists or not. If the pool has already been terminated or is +%% in the process of being terminated, the task may trigger the pool to +%% abort and error out to point out invalid usage. +-spec pool_task_async(pid(), term()) -> ok. +pool_task_async(Pid, Task) -> + Pid ! {task, Task}, + ok. + +%% @doc Mark the pool as terminated. At this point it will stop +%% accepting new tasks but will keep processing those that have been +%% scheduled. +%% +%% Once all tasks are complete and workers have shut down, the +%% accumulated value will be returned. +%% +%% Any process may terminate the pool, and the pool may only be +%% terminated once. +-spec pool_terminate(pid()) -> [term()]. +pool_terminate(Pid) -> + Ref = erlang:monitor(process, Pid), + Pid ! {self(), terminate}, + receive + {Pid, Res} -> + erlang:demonitor(Ref, [flush]), + Res; + {'DOWN', Ref, process, Pid, Info} -> + error(Info) + end. + +%%%%%%%%%%%%%%%% +%%% INTERNAL %%% +%%%%%%%%%%%%%%%% + +%%% Queue implementation %%% +%% @private the queue is rather straightforward. `Targets' represents the tasks +%% yet to be run, which are sent to workers in `Pids' as they mark themselves +%% as free. When workers are empty but no tasks are left, they are shutdown. +%% Once the shutdown is complete, the result is returned. parallel_dispatch([], [], _, _) -> []; parallel_dispatch(Targets, Pids, Handler, Args) -> @@ -44,6 +175,84 @@ parallel_dispatch(Targets, Pids, Handler, Args) -> end end. +%%% Pool implementation %%% +%% @private The pool supports asynchronous tasks addition, which makes it +%% significantly hairier than the task queue. It uses `Tasks' to track +%% enqueued tasks, `FreePids' to track the workers that currently do not +%% have work to do, `Pids' to track all workers (and know what remains to +%% be shut down at the end), an accumulator (`Acc') for results that must +%% be returned on-demand, and a `Report' value that is either `undefined' +%% or marks the Pid of the process calling for a report. +%% +%% Tasks and free processes can grow individually at different times, and +%% we only demand that workers start shutting down once a `Report' entry +%% has been defined, which returns once tasks that were in flight have all +%% terminated. +pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report) -> + receive + {task, Task} when Report =/= undefined -> + ?ERROR("Task added to pool after being terminated: ~p", [Task]), + ?ABORT; + {task, Task} when FreePids =:= [] -> + %% no worker is free, enqueue. + pool_coordinator([Task|Tasks], FreePids, Pids, Handler, HArgs, Acc, Report); + {task, Task} -> + %% workers are free, assign right away + [Pid|NewFree] = FreePids, + Pid ! {task, Task}, + pool_coordinator(Tasks, NewFree, Pids, Handler, HArgs, Acc, Report); + {ready, _Pid} when Tasks =:= [], Report =/= undefined -> + %% And we're done! + %% terminate workers async, and wait for their shutdown + [Pid ! empty || {Pid,_Ref} <- Pids], + pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report); + {ready, Pid} when Tasks =:= [] -> + %% worker free, no tasks to run, put in the free list + pool_coordinator(Tasks, [Pid|FreePids], Pids, Handler, HArgs, Acc, Report); + {ready, Pid} -> + %% worker free, tasks are available, assign right away + [Task|NewTasks] = Tasks, + Pid ! {task, Task}, + pool_coordinator(NewTasks, FreePids, Pids, Handler, HArgs, Acc, Report); + {'DOWN', Mref, _, Pid, normal} -> + %% worker terminated as expected + NewPids = lists:delete({Pid, Mref}, Pids), + NewFree = lists:delete(Pid, FreePids), + case NewPids of + [] when is_pid(Report) -> + %% shutdown complete! + Report ! {self(), Acc}; + _ -> + %% still shutting down + pool_coordinator(Tasks, NewFree, NewPids, Handler, HArgs, Acc, Report) + end; + {'DOWN', _Mref, _, _Pid, Info} -> + ?ERROR("Task failed: ~p", [Info]), + ?ABORT; + {result, Result} -> + case Handler(Result, HArgs) of + ok -> + pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report); + {ok, Res} -> + pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, [Res|Acc], Report) + end; + {Caller, terminate} when Report =:= undefined -> + %% We're being asked to return results! + if Pids =:= []; % no workers somehow + length(Pids) =:= length(FreePids), Tasks =:= [] -> % All Idle, no work to do + Caller ! {self(), Acc}; + true -> + %% Still work to do + pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Caller) + end; + {Caller, terminate} when is_pid(Report) -> + ?ERROR("Another process (~p) already terminates the pool, demand from ~p is invalid", + [Report, Caller]), + ?ABORT + end. + +%%% Shared components %%% + worker(QueuePid, F, Args) -> QueuePid ! {ready, self()}, receive @@ -53,4 +262,3 @@ worker(QueuePid, F, Args) -> empty -> ok end. - diff --git a/apps/rebar/test/rebar_parallel_SUITE.erl b/apps/rebar/test/rebar_parallel_SUITE.erl new file mode 100644 index 000000000..ad0224870 --- /dev/null +++ b/apps/rebar/test/rebar_parallel_SUITE.erl @@ -0,0 +1,80 @@ +-module(rebar_parallel_SUITE). +-compile(export_all). +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> [empty_set, same_results, pool_fetcher, pool_misuse]. + +empty_set() -> + [{doc, "Running with null tasks still works"}]. +empty_set(_) -> + ?assertEqual([], + rebar_parallel:queue([], + fun(X,_) -> X end, [arg], + fun(_,_) -> ok end, [arg])), + ?assertEqual([], + rebar_parallel:queue([], + fun(X,_) -> X end, [arg], + fun(X,_) -> {ok, X} end, [arg])), + P1 = rebar_parallel:pool(fun(X,_) -> X end, [arg], + fun(_,_) -> ok end, [arg]), + ?assertEqual([], + rebar_parallel:pool_terminate(P1)), + P2 = rebar_parallel:pool(fun(X,_) -> X end, [arg], + fun(X,_) -> {ok,X} end, [arg]), + ?assertEqual([], + rebar_parallel:pool_terminate(P2)), + ok. + +same_results() -> + [{doc, "The two parallel methods can be equivalent but the pool can " + "be used to do asynchronous task creation"}]. +same_results(_) -> + ?assertEqual([2,4,6,8,10,12,14], + lists:sort( + rebar_parallel:queue([1,2,3,4,5,6,7], + fun(X,_) -> X*2 end, [], + fun(X,_) -> {ok, X} end, []))), + P = rebar_parallel:pool(fun(X,_) -> X*2 end, [], + fun(X,_) -> {ok, X} end, []), + _ = [rebar_parallel:pool_task_async(P, N) || N <- [1,2,3,4,5,6,7]], + ?assertEqual([2,4,6,8,10,12,14], + lists:sort(rebar_parallel:pool_terminate(P))), + ok. + +pool_fetcher() -> + [{doc, "The fetcher from a pool can be from a different process " + "and the other one will get an error."}]. +pool_fetcher(_) -> + Parent = self(), + P = rebar_parallel:pool(fun(X,_) -> X*2 end, [], + fun(X,_) -> {ok, X} end, []), + _ = [rebar_parallel:pool_task_async(P, N) || N <- [1,2,3,4,5,6,7]], + spawn_link(fun() -> Parent ! {res, lists:sort(rebar_parallel:pool_terminate(P))} end), + receive + {res, X} -> ?assertEqual([2,4,6,8,10,12,14], X) + after 500 -> error(timeout) + end, + ok. + +pool_misuse() -> + [{doc, "Using the pool for tasks after it is terminated but before " + "it returns, you get a crash even if it's async"}]. +pool_misuse(_) -> + Parent = self(), + P = rebar_parallel:pool(fun(_,_) -> timer:sleep(1000) end, [], + fun(X,_) -> {ok, X} end, []), + _ = [rebar_parallel:pool_task_async(P, N) || N <- [1,2,3,4,5,6,7]], + spawn(fun() -> Parent ! ok, rebar_parallel:pool_terminate(P) end), + receive ok -> timer:sleep(100) end, + Old = process_flag(trap_exit, true), + rebar_parallel:pool_task_async(P, 0), + receive + {'EXIT', P, {{nocatch, rebar_abort}, _Stack}} -> + process_flag(trap_exit, Old) + after 1000 -> + process_flag(trap_exit, Old), + error(no_abort) + end, + ok. +