Skip to content

Commit

Permalink
fix: don't deadlock when lots of async reactors are sharing a concurr…
Browse files Browse the repository at this point in the history
…ency pool.
  • Loading branch information
jimsynz committed Jul 17, 2023
1 parent 5b9d4a6 commit 16d75db
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 1 deletion.
14 changes: 14 additions & 0 deletions lib/reactor/executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ defmodule Reactor.Executor do
{:continue, ready_steps} <- find_ready_steps(reactor),
{:continue, reactor, state} <- start_ready_async_steps(reactor, state, ready_steps),
{:continue, reactor, state} <- run_ready_sync_step(reactor, state, ready_steps),
{:continue, reactor, state} <- maybe_run_any_step_sync(reactor, state, ready_steps),
{:continue, reactor} <- all_done(reactor) do
execute(reactor, subtract_iteration(state))
else
Expand Down Expand Up @@ -154,6 +155,19 @@ defmodule Reactor.Executor do
Executor.Sync.run(reactor, state, step)
end

defp maybe_run_any_step_sync(reactor, state, []), do: {:continue, reactor, state}

defp maybe_run_any_step_sync(reactor, state, [step | _]) do
:__reactor__
|> Process.get([])
|> Enum.any?(&(&1.concurrency_key == state.concurrency_key))
|> if do
Executor.Sync.run(reactor, state, step)
else
{:continue, reactor, state}
end
end

defp subtract_iteration(state) when state.max_iterations == :infinity, do: state

defp subtract_iteration(state) when state.max_iterations > 0,
Expand Down
14 changes: 13 additions & 1 deletion lib/reactor/executor/step_runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,20 @@ defmodule Reactor.Executor.StepRunner do
{module, options} <- module_and_opts(step),
{:ok, context} <- build_context(reactor, step, concurrency_key),
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
do_run(module, options, arguments, context)
metadata = %{
current_step: step,
pid: self(),
reactor: reactor,
concurrency_key: concurrency_key
}

metadata_stack = Process.get(:__reactor__, [])
Process.put(:__reactor__, [metadata | metadata_stack])
result = do_run(module, options, arguments, context)
Process.put(:__reactor__, metadata_stack)
result
end
after
end

@doc """
Expand Down
58 changes: 58 additions & 0 deletions test/reactor/executor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -424,5 +424,63 @@ defmodule Reactor.ExecutorTest do
assert elapsed >= 200
assert elapsed < 300
end

test "reactors running inside async steps with shared concurrency don't cause a deadlock" do
defmodule MaybeDeadlockedReactor.Inner do
@moduledoc false
use Reactor

step :sleep do
run(fn _ ->
Process.sleep(100)
{:ok, nil}
end)
end
end

defmodule MaybeDeadlockedReactor do
@moduledoc false
use Reactor

for i <- 0..16 do
step :"sleep_#{i}" do
run(fn _, context ->
Reactor.run(
MaybeDeadlockedReactor.Inner,
%{},
%{},
concurrency_key: context.concurrency_key
)
end)
end
end
end

Reactor.run(MaybeDeadlockedReactor, %{}, %{}, max_concurrency: 8)
end

test "lots of reactors sharing a concurrency key do not deadlock" do
defmodule OversharingReactor do
@moduledoc false
use Reactor

step :sleep do
run fn _ ->
Process.sleep(100)
{:ok, nil}
end
end
end

pool = ConcurrencyTracker.allocate_pool(8)

0..16
|> Enum.map(fn _ ->
Task.async(fn ->
Reactor.run(OversharingReactor, %{}, %{}, concurrency_key: pool)
end)
end)
|> Enum.map(&Task.await/1)
end
end
end

0 comments on commit 16d75db

Please sign in to comment.