Skip to content

Commit

Permalink
Merge pull request #2768 from ferd/pooling-dag-operations
Browse files Browse the repository at this point in the history
Add rebar_parallel pool, use in DAG scans
  • Loading branch information
ferd authored May 15, 2023
2 parents e8956a1 + 4583c25 commit cc0c1a9
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 54 deletions.
96 changes: 44 additions & 52 deletions apps/rebar/src/rebar_compiler_dag.erl
Original file line number Diff line number Diff line change
Expand Up @@ -132,46 +132,41 @@ filter_prefix(G, [{App, Out} | AppTail] = AppPaths, [File | FTail]) ->
filter_prefix(G, AppPaths, FTail)
end.

finalise_populate_sources(_G, _InDirs, Waiting) when Waiting =:= #{} ->
finalise_populate_sources(G, InDirs, Pool) ->
Res = rebar_parallel:pool_terminate(Pool),
finalise_populate_sources_(G, InDirs, Res).

finalise_populate_sources_(_G, _InDirs, []) ->
ok;
finalise_populate_sources(G, InDirs, Waiting) ->
%% wait for all deps to complete
receive
{deps, Pid, AbsIncls} ->
{Status, Source} = maps:get(Pid, Waiting),
%% the file hasn't been visited yet; set it to existing, but with
%% a last modified value that's null so it gets updated to something new.
[digraph:add_vertex(G, Src, 0) || Src <- AbsIncls,
digraph:vertex(G, Src) =:= false],
%% drop edges from deps that aren't included!
[digraph:del_edge(G, Edge) || Status == old,
Edge <- digraph:out_edges(G, Source),
{_, _Src, Path, _Label} <- [digraph:edge(G, Edge)],
not lists:member(Path, AbsIncls)],
%% Add the rest
[digraph:add_edge(G, Source, Incl) || Incl <- AbsIncls],
%% mark the digraph dirty when there is any change in
%% dependencies, for any application in the project
mark_dirty(G),
finalise_populate_sources(G, InDirs, Waiting);
{'DOWN', _MRef, process, Pid, normal} ->
finalise_populate_sources(G, InDirs, maps:remove(Pid, Waiting));
{'DOWN', _MRef, process, Pid, Reason} ->
{_Status, Source} = maps:get(Pid, Waiting),
?ERROR("Failed to get dependencies for ~s~n~p", [Source, Reason]),
?ABORT
end.
finalise_populate_sources_(G, InDirs, [{Status, {deps, Source, AbsIncls}}|Acc]) ->
%% the file hasn't been visited yet; set it to existing, but with
%% a last modified value that's null so it gets updated to something new.
[digraph:add_vertex(G, Src, 0) || Src <- AbsIncls,
digraph:vertex(G, Src) =:= false],
%% drop edges from deps that aren't included!
[digraph:del_edge(G, Edge) || Status == old,
Edge <- digraph:out_edges(G, Source),
{_, _Src, Path, _Label} <- [digraph:edge(G, Edge)],
not lists:member(Path, AbsIncls)],
%% Add the rest
[digraph:add_edge(G, Source, Incl) || Incl <- AbsIncls],
%% mark the digraph dirty when there is any change in
%% dependencies, for any application in the project
mark_dirty(G),
finalise_populate_sources_(G, InDirs, Acc).

%% @doc this function scans all the source files found and looks into
%% all the `InDirs' for deps (other source files, or files that aren't source
%% but still returned by the compiler module) that are related
%% to them.
populate_sources(G, Compiler, InDirs, Sources, DepOpts) ->
populate_sources(G, Compiler, InDirs, Sources, DepOpts, #{}).
Pool = rebar_parallel:pool(fun erlang:apply/2, [],
fun(Res, _) -> {ok, Res} end, []),
populate_sources(G, Compiler, InDirs, Sources, DepOpts, Pool).

populate_sources(G, _Compiler, InDirs, [], _DepOpts, Waiting) ->
finalise_populate_sources(G, InDirs, Waiting);
populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Waiting) ->
populate_sources(G, _Compiler, InDirs, [], _DepOpts, Pool) ->
finalise_populate_sources(G, InDirs, Pool);
populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Pool) ->
case digraph:vertex(G, Source) of
{_, LastUpdated} ->
case filelib:last_modified(Source) of
Expand All @@ -180,19 +175,21 @@ populate_sources(G, Compiler, InDirs, [Source|Erls], DepOpts, Waiting) ->
%% from the graph.
digraph:del_vertex(G, Source),
mark_dirty(G),
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting);
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool);
LastModified when LastUpdated < LastModified ->
digraph:add_vertex(G, Source, LastModified),
Worker = prepopulate_deps(Compiler, InDirs, Source, DepOpts, self()),
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting#{Worker => {old, Source}});
Work = fun() -> {old, prepopulate_deps(Compiler, InDirs, Source, DepOpts)} end,
rebar_parallel:pool_task_async(Pool, Work),
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool);
_ -> % unchanged
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting)
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool)
end;
false ->
LastModified = filelib:last_modified(Source),
digraph:add_vertex(G, Source, LastModified),
Worker = prepopulate_deps(Compiler, InDirs, Source, DepOpts, self()),
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Waiting#{Worker => {new, Source}})
Work = fun() -> {new, prepopulate_deps(Compiler, InDirs, Source, DepOpts)} end,
rebar_parallel:pool_task_async(Pool, Work),
populate_sources(G, Compiler, InDirs, Erls, DepOpts, Pool)
end.

%% @doc Scan all files in the digraph that are seen as dependencies, but are
Expand Down Expand Up @@ -450,20 +447,15 @@ maybe_rm_vertex(G, Source) ->
%% mark its timestamp to 0, which means we have no info on it.
%% Source files will be covered at a later point in their own scan, and
%% non-source files are going to be covered by `populate_deps/3'.
prepopulate_deps(Compiler, InDirs, Source, DepOpts, Control) ->
{Worker, _MRef} = spawn_monitor(
fun () ->
SourceDir = filename:dirname(Source),
AbsIncls = case erlang:function_exported(Compiler, dependencies, 4) of
false ->
Compiler:dependencies(Source, SourceDir, InDirs);
true ->
Compiler:dependencies(Source, SourceDir, InDirs, DepOpts)
end,
Control ! {deps, self(), AbsIncls}
end
),
Worker.
prepopulate_deps(Compiler, InDirs, Source, DepOpts) ->
SourceDir = filename:dirname(Source),
AbsIncls = case erlang:function_exported(Compiler, dependencies, 4) of
false ->
Compiler:dependencies(Source, SourceDir, InDirs);
true ->
Compiler:dependencies(Source, SourceDir, InDirs, DepOpts)
end,
{deps, Source, AbsIncls}.

%% check that a dep file is up to date
refresh_dep(_G, {artifact, _}) ->
Expand Down
212 changes: 210 additions & 2 deletions apps/rebar/src/rebar_parallel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,38 @@
%%%
%%% Original design by Max Fedorov in the rebar compiler, then generalised
%%% and extracted here to be reused in other circumstances.
%%%
%%% It also contains an asynchronous version of the queue built with a
%%% naive pool, which allows similar semantics in worker definitions, but
%%% without demanding to know all the tasks to run ahead of time.
%%% @end
-module(rebar_parallel).
-export([queue/5]).
-export([queue/5,
pool/4, pool/5, pool_task_async/2, pool_terminate/1]).
-include("rebar.hrl").

%% @doc Create a queue using as many workers as there are schedulers,
%% that will spread all `Task' entries among them based on whichever
%% is available first.
%%
%% The values returned by the worker function `WorkF' for each value
%% is then passed to a `Handler' which either discards its result
%% after having done a side-effectful operation (by returning `ok')
%% as in a `lists:foreach/2' call, or returns a value that gets
%% added to an accumulator (by returning `{ok, Val}'). The handler
%% can return both types as required.
%%
%% The accumulated list of value is in no specific order and depends
%% on how tasks were scheduled and completed.
-spec queue([Task], WorkF, WArgs, Handler, HArgs) -> [Ret] when
Task :: term(),
WorkF :: fun((Task, WArgs) -> TmpRet),
WArgs :: term(),
Handler :: fun((TmpRet, HArgs) -> NoRet | AccVal),
HArgs :: term(),
NoRet :: ok,
AccVal :: {ok, Ret},
Ret :: term().
queue(Tasks, WorkF, WArgs, Handler, HArgs) ->
Parent = self(),
Worker = fun() -> worker(Parent, WorkF, WArgs) end,
Expand All @@ -18,6 +45,110 @@ queue(Tasks, WorkF, WArgs, Handler, HArgs) ->
Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, Jobs)],
parallel_dispatch(Tasks, Pids, Handler, HArgs).

%% @doc Create a pool using as many workers as there are schedulers,
%% and for which tasks can be added by calling `pool_async_task/2'.
%%
%% The values returned by the worker function `WorkF' for each value
%% is then passed to a `Handler' which either discards its result
%% after having done a side-effectful operation (by returning `ok')
%% as in a `lists:foreach/2' call, or returns a value that gets
%% added to an accumulator (by returning `{ok, Val}'). The handler
%% can return both types as required.
%%
%% The accumulated list of value is in no specific order and depends
%% on how tasks were scheduled and completed, and can only
%% be obtained by calling `pool_terminate/1'.
%%
%% The pool process is linked to its initial caller and will error
%% out via a link if any task crashes or other invalid states are found
-spec pool(WorkF, WArgs, Handler, HArgs) -> pid() when
WorkF :: fun((Task, WArgs) -> TmpRet),
Task :: term(),
WArgs :: term(),
Handler :: fun((TmpRet, HArgs) -> NoRet | AccVal),
HArgs :: term(),
NoRet :: ok,
AccVal :: {ok, term()}.
pool(WorkF, WArgs, Handler, HArgs) ->
pool(WorkF, WArgs, Handler, HArgs, erlang:system_info(schedulers)).

%% @doc Create a pool using `PoolSize' workers and for which tasks can be
%% added by calling `pool_async_task/2'.
%%
%% The values returned by the worker function `WorkF' for each value
%% is then passed to a `Handler' which either discards its result
%% after having done a side-effectful operation (by returning `ok')
%% as in a `lists:foreach/2' call, or returns a value that gets
%% added to an accumulator (by returning `{ok, Val}'). The handler
%% can return both types as required.
%%
%% The accumulated list of value is in no specific order and depends
%% on how tasks were scheduled and completed, and can only
%% be obtained by calling `pool_terminate/1'.
%%
%% The pool process is linked to its initial caller and will error
%% out via a link if any task crashes or other invalid states are found
-spec pool(WorkF, WArgs, Handler, HArgs, PoolSize) -> pid() when
WorkF :: fun((Task, WArgs) -> TmpRet),
Task :: term(),
WArgs :: term(),
Handler :: fun((TmpRet, HArgs) -> NoRet | AccVal),
HArgs :: term(),
PoolSize :: pos_integer(),
NoRet :: ok,
AccVal :: {ok, term()}.
pool(WorkF, WArgs, Handler, HArgs, PoolSize) when PoolSize > 0 ->
Parent = self(),
Coordinator = spawn_link(fun() ->
Coord = self(),
Worker = fun() -> worker(Coord, WorkF, WArgs) end,
Pids = [spawn_monitor(Worker) || _ <- lists:seq(1, PoolSize)],
Parent ! pool_ready,
pool_coordinator([], [], Pids, Handler, HArgs, [], undefined)
end),
receive pool_ready -> ok end,
Coordinator.

%% @doc Add a task to a pool.
%% This call is asynchronous and does no validation about whether the pool
%% process exists or not. If the pool has already been terminated or is
%% in the process of being terminated, the task may trigger the pool to
%% abort and error out to point out invalid usage.
-spec pool_task_async(pid(), term()) -> ok.
pool_task_async(Pid, Task) ->
Pid ! {task, Task},
ok.

%% @doc Mark the pool as terminated. At this point it will stop
%% accepting new tasks but will keep processing those that have been
%% scheduled.
%%
%% Once all tasks are complete and workers have shut down, the
%% accumulated value will be returned.
%%
%% Any process may terminate the pool, and the pool may only be
%% terminated once.
-spec pool_terminate(pid()) -> [term()].
pool_terminate(Pid) ->
Ref = erlang:monitor(process, Pid),
Pid ! {self(), terminate},
receive
{Pid, Res} ->
erlang:demonitor(Ref, [flush]),
Res;
{'DOWN', Ref, process, Pid, Info} ->
error(Info)
end.

%%%%%%%%%%%%%%%%
%%% INTERNAL %%%
%%%%%%%%%%%%%%%%

%%% Queue implementation %%%
%% @private the queue is rather straightforward. `Targets' represents the tasks
%% yet to be run, which are sent to workers in `Pids' as they mark themselves
%% as free. When workers are empty but no tasks are left, they are shutdown.
%% Once the shutdown is complete, the result is returned.
parallel_dispatch([], [], _, _) ->
[];
parallel_dispatch(Targets, Pids, Handler, Args) ->
Expand All @@ -44,6 +175,84 @@ parallel_dispatch(Targets, Pids, Handler, Args) ->
end
end.

%%% Pool implementation %%%
%% @private The pool supports asynchronous tasks addition, which makes it
%% significantly hairier than the task queue. It uses `Tasks' to track
%% enqueued tasks, `FreePids' to track the workers that currently do not
%% have work to do, `Pids' to track all workers (and know what remains to
%% be shut down at the end), an accumulator (`Acc') for results that must
%% be returned on-demand, and a `Report' value that is either `undefined'
%% or marks the Pid of the process calling for a report.
%%
%% Tasks and free processes can grow individually at different times, and
%% we only demand that workers start shutting down once a `Report' entry
%% has been defined, which returns once tasks that were in flight have all
%% terminated.
pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report) ->
receive
{task, Task} when Report =/= undefined ->
?ERROR("Task added to pool after being terminated: ~p", [Task]),
?ABORT;
{task, Task} when FreePids =:= [] ->
%% no worker is free, enqueue.
pool_coordinator([Task|Tasks], FreePids, Pids, Handler, HArgs, Acc, Report);
{task, Task} ->
%% workers are free, assign right away
[Pid|NewFree] = FreePids,
Pid ! {task, Task},
pool_coordinator(Tasks, NewFree, Pids, Handler, HArgs, Acc, Report);
{ready, _Pid} when Tasks =:= [], Report =/= undefined ->
%% And we're done!
%% terminate workers async, and wait for their shutdown
[Pid ! empty || {Pid,_Ref} <- Pids],
pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report);
{ready, Pid} when Tasks =:= [] ->
%% worker free, no tasks to run, put in the free list
pool_coordinator(Tasks, [Pid|FreePids], Pids, Handler, HArgs, Acc, Report);
{ready, Pid} ->
%% worker free, tasks are available, assign right away
[Task|NewTasks] = Tasks,
Pid ! {task, Task},
pool_coordinator(NewTasks, FreePids, Pids, Handler, HArgs, Acc, Report);
{'DOWN', Mref, _, Pid, normal} ->
%% worker terminated as expected
NewPids = lists:delete({Pid, Mref}, Pids),
NewFree = lists:delete(Pid, FreePids),
case NewPids of
[] when is_pid(Report) ->
%% shutdown complete!
Report ! {self(), Acc};
_ ->
%% still shutting down
pool_coordinator(Tasks, NewFree, NewPids, Handler, HArgs, Acc, Report)
end;
{'DOWN', _Mref, _, _Pid, Info} ->
?ERROR("Task failed: ~p", [Info]),
?ABORT;
{result, Result} ->
case Handler(Result, HArgs) of
ok ->
pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Report);
{ok, Res} ->
pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, [Res|Acc], Report)
end;
{Caller, terminate} when Report =:= undefined ->
%% We're being asked to return results!
if Pids =:= []; % no workers somehow
length(Pids) =:= length(FreePids), Tasks =:= [] -> % All Idle, no work to do
Caller ! {self(), Acc};
true ->
%% Still work to do
pool_coordinator(Tasks, FreePids, Pids, Handler, HArgs, Acc, Caller)
end;
{Caller, terminate} when is_pid(Report) ->
?ERROR("Another process (~p) already terminates the pool, demand from ~p is invalid",
[Report, Caller]),
?ABORT
end.

%%% Shared components %%%

worker(QueuePid, F, Args) ->
QueuePid ! {ready, self()},
receive
Expand All @@ -53,4 +262,3 @@ worker(QueuePid, F, Args) ->
empty ->
ok
end.

Loading

0 comments on commit cc0c1a9

Please sign in to comment.