Skip to content

Commit

Permalink
fix: don't deadlock when lots of async reactors are sharing a concurr…
Browse files Browse the repository at this point in the history
…ency pool.
  • Loading branch information
jimsynz committed Sep 27, 2023
1 parent 9220ba3 commit 2d48eee
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 60 deletions.
18 changes: 14 additions & 4 deletions lib/reactor/executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ defmodule Reactor.Executor do
{:continue, ready_steps} <- find_ready_steps(reactor),
{:continue, reactor, state} <- start_ready_async_steps(reactor, state, ready_steps),
{:continue, reactor, state} <- run_ready_sync_step(reactor, state, ready_steps),
{:continue, reactor, state} <- maybe_run_any_step_sync(reactor, state, ready_steps),
{:continue, reactor} <- all_done(reactor) do
execute(reactor, subtract_iteration(state))
else
Expand Down Expand Up @@ -132,10 +133,6 @@ defmodule Reactor.Executor do

defp start_ready_async_steps(reactor, state, []), do: {:continue, reactor, state}

defp start_ready_async_steps(reactor, state, _steps)
when map_size(state.current_tasks) == state.max_concurrency,
do: {:continue, reactor, state}

defp start_ready_async_steps(reactor, state, steps) do
steps = Enum.filter(steps, &(&1.async? == true))

Expand All @@ -154,6 +151,19 @@ defmodule Reactor.Executor do
Executor.Sync.run(reactor, state, step)
end

defp maybe_run_any_step_sync(reactor, state, []), do: {:continue, reactor, state}

defp maybe_run_any_step_sync(reactor, state, [step | _]) do
:__reactor__
|> Process.get([])
|> Enum.any?(&(&1.concurrency_key == state.concurrency_key))
|> if do
Executor.Sync.run(reactor, state, step)
else
{:continue, reactor, state}
end
end

defp subtract_iteration(state) when state.max_iterations == :infinity, do: state

defp subtract_iteration(state) when state.max_iterations > 0,
Expand Down
27 changes: 9 additions & 18 deletions lib/reactor/executor/async.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@ defmodule Reactor.Executor.Async do
def start_steps(reactor, state, [], _supervisor), do: {:continue, reactor, state}

def start_steps(reactor, state, steps, supervisor) do
available_concurrency = state.max_concurrency - map_size(state.current_tasks)
available_steps = length(steps)

start_steps(reactor, state, steps, supervisor, available_concurrency)
end

defp start_steps(reactor, state, _steps, _supervisor, 0), do: {:continue, reactor, state}
locked_concurrency =
acquire_concurrency_resource_from_pool(state.concurrency_key, available_steps)

defp start_steps(reactor, state, steps, supervisor, available_concurrency) do
started =
steps
|> Enum.take(available_concurrency)
|> Enum.take_while(&acquire_concurrency_resource_from_pool(state.concurrency_key, &1))
|> Enum.take(locked_concurrency)
|> Enum.reduce_while(%{}, fn step, started ->
case start_task_for_step(reactor, state, step, supervisor, state.concurrency_key) do
{:ok, task} -> {:cont, Map.put(started, task, step)}
Expand Down Expand Up @@ -362,17 +358,12 @@ defmodule Reactor.Executor.Async do
%{reactor | steps: Enum.concat(steps, reactor.steps)}
end

defp release_concurrency_resources_to_pool(_pool_key, 0), do: :ok

defp release_concurrency_resources_to_pool(pool_key, n) when n > 0 do
ConcurrencyTracker.release(pool_key)
release_concurrency_resources_to_pool(pool_key, n - 1)
defp release_concurrency_resources_to_pool(pool_key, how_many) do
ConcurrencyTracker.release(pool_key, how_many)
end

defp acquire_concurrency_resource_from_pool(pool_key, _) do
case ConcurrencyTracker.acquire(pool_key) do
:ok -> true
:error -> false
end
defp acquire_concurrency_resource_from_pool(pool_key, requested) do
{:ok, actual} = ConcurrencyTracker.acquire(pool_key, requested)
actual
end
end
84 changes: 60 additions & 24 deletions lib/reactor/executor/concurrency_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,20 @@ defmodule Reactor.Executor.ConcurrencyTracker do
This avoids nested Reactors spawning too many workers and thrashing the
system.
The process calling `allocate_pool/1` is monitored, and when it terminates
it's allocation is removed. Any processes which are using that pool will
not be able to allocate any new resources.
"""

use GenServer

@type pool_key :: reference()

@type record ::
{pool_key, concurrency_limit :: pos_integer, available_slots :: non_neg_integer(),
allocator :: pid}

@doc false
@spec start_link(any) :: GenServer.on_start()
def start_link(_), do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
Expand Down Expand Up @@ -65,35 +73,63 @@ defmodule Reactor.Executor.ConcurrencyTracker do
end

@doc """
Release a concurrency allocation back to the pool.
Release concurrency allocation back to the pool.
"""
@spec release(pool_key) :: :ok
def release(key) do
:ets.select_replace(__MODULE__, [
{{:"$1", :"$2", :"$3", :"$4"},
[{:andalso, {:"=<", {:+, :"$2", 1}, :"$3"}, {:==, :"$1", key}}],
[{{:"$1", {:+, :"$2", 1}, :"$3", :"$4"}}]}
])

:ok
@spec release(pool_key, how_many :: pos_integer) :: :ok | :error
def release(key, how_many \\ 1) do
# generated using:
#
# :ets.fun2ms(fn {key, concurrency_limit, concurrency_available, owner}
# when key == :key and concurrency_available + 1 <= concurrency_limit ->
# {key, concurrency_limit, concurrency_available + 1, owner}
# end)
#
# and replacing `:key` with the provided key.

Enum.reduce_while(1..how_many, :ok, fn _, :ok ->
case :ets.select_replace(__MODULE__, [
{{:"$1", :"$2", :"$3", :"$4"},
[{:andalso, {:==, :"$1", key}, {:"=<", {:+, :"$3", 1}, :"$2"}}],
[{{:"$1", :"$2", {:+, :"$3", 1}, :"$4"}}]}
]) do
0 -> {:halt, :error}
1 -> {:cont, :ok}
end
end)
end

@doc """
Attempt to acquire a concurrency allocation from the pool.
Attempt to acquire a number of concurrency allocations from the pool.
Returns `:ok` if the allocation was successful, otherwise `:error`.
Returns `{:ok, n}` where `n` was the number of slots that were actually
allocated. It's important to note that whilst you may request `16` slots, if
there is only `3` available, then this function will return `{:ok, 3}` and you
must abide by it.
It is possible for this function to return `{:ok, 0}` if there is no slots
available.
"""
@spec acquire(pool_key) :: :ok | :error
def acquire(key) do
__MODULE__
|> :ets.select_replace([
{{:"$1", :"$2", :"$3", :"$4"}, [{:andalso, {:>=, {:-, :"$2", 1}, 0}, {:==, :"$1", key}}],
[{{:"$1", {:-, :"$2", 1}, :"$3", :"$4"}}]}
])
|> case do
0 -> :error
1 -> :ok
end
@spec acquire(pool_key, how_many :: pos_integer()) :: {:ok, non_neg_integer()}
def acquire(key, how_many \\ 1) do
# generated using:
#
# :ets.fun2ms(fn {key, concurrency_limit, concurrency_available, owner}
# when key == :key and concurrency_available - 1 >= 0 ->
# {key, concurrency_limit, concurrency_available - 1, owner}
# end)
#
# and replacing `:key` with the provided key.

Enum.reduce_while(1..how_many, {:ok, 0}, fn _, {:ok, n} ->
case :ets.select_replace(__MODULE__, [
{{:"$1", :"$2", :"$3", :"$4"},
[{:andalso, {:==, :"$1", key}, {:>=, {:-, :"$3", 1}, 0}}],
[{{:"$1", :"$2", {:-, :"$3", 1}, :"$4"}}]}
]) do
0 -> {:halt, {:ok, n}}
1 -> {:cont, {:ok, n + 1}}
end
end)
end

@doc """
Expand All @@ -105,7 +141,7 @@ defmodule Reactor.Executor.ConcurrencyTracker do
__MODULE__
|> :ets.lookup(key)
|> case do
[{_, available, limit, _}] -> {:ok, available, limit}
[{_, limit, available, _}] -> {:ok, available, limit}
[] -> {:error, "Unknown concurrency pool"}
end
end
Expand Down
14 changes: 13 additions & 1 deletion lib/reactor/executor/step_runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,20 @@ defmodule Reactor.Executor.StepRunner do
{module, options} <- module_and_opts(step),
{:ok, context} <- build_context(reactor, state, step, concurrency_key),
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
do_run(module, options, arguments, context)
metadata = %{
current_step: step,
pid: self(),
reactor: reactor,
concurrency_key: concurrency_key
}

metadata_stack = Process.get(:__reactor__, [])
Process.put(:__reactor__, [metadata | metadata_stack])
result = do_run(module, options, arguments, context)
Process.put(:__reactor__, metadata_stack)
result
end
after
end

@doc """
Expand Down
7 changes: 0 additions & 7 deletions test/reactor/executor/async_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ defmodule Reactor.Executor.AsyncTest do
assert {:continue, ^reactor, ^state} = start_steps(reactor, state, [])
end

test "when there is no available concurrency slots, it tells the reactor to continue",
%{reactor: reactor, state: state, doable: doable} do
state = %{state | max_concurrency: 1, current_tasks: %{nil => nil}}

assert {:continue, ^reactor, ^state} = start_steps(reactor, state, [doable])
end

test "when steps are started, it stores them in the state",
%{reactor: reactor, state: state, doable: doable, supervisor: supervisor} do
assert {_, _reactor, state} = start_steps(reactor, state, [doable], supervisor)
Expand Down
12 changes: 6 additions & 6 deletions test/reactor/executor/concurrency_tracker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,26 @@ defmodule Reactor.Executor.ConcurrencyTrackerTest do
describe "acquire/1" do
test "when there is available concurrency in the pool, it returns ok" do
pool = allocate_pool(16)
assert :ok = acquire(pool)
assert {:ok, 1} = acquire(pool)
assert {:ok, 15, 16} = status(pool)
end

test "when there is no available concurrency in the pool, it returns error" do
test "when there is no available concurrency in the pool, it returns zero" do
pool = allocate_pool(0)
assert :error = acquire(pool)
assert {:ok, 0} = acquire(pool)
end

test "when there is 1 slot left, it can be acquired" do
pool = allocate_pool(1)
assert :ok = acquire(pool)
assert {:ok, 1} = acquire(pool)
assert {:ok, 0, 1} = status(pool)
end
end

describe "release/1" do
test "it increments the available concurrency in the pool when possible" do
pool = allocate_pool(16)
:ok = acquire(pool)
{:ok, 1} = acquire(pool)
assert {:ok, 15, 16} = status(pool)
assert :ok = release(pool)
assert {:ok, 16, 16} = status(pool)
Expand All @@ -57,7 +57,7 @@ defmodule Reactor.Executor.ConcurrencyTrackerTest do
test "it doesn't allow the pool to grow" do
pool = allocate_pool(16)
assert {:ok, 16, 16} = status(pool)
assert :ok = release(pool)
assert :error = release(pool)
assert {:ok, 16, 16} = status(pool)
end
end
Expand Down
112 changes: 112 additions & 0 deletions test/reactor/executor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -424,5 +424,117 @@ defmodule Reactor.ExecutorTest do
assert elapsed >= 200
assert elapsed < 300
end

test "reactors running inside async steps with shared concurrency don't cause a deadlock" do
defmodule MaybeDeadlockedReactor.Inner do
@moduledoc false
use Reactor

step :sleep do
run(fn _ ->
Process.sleep(100)
{:ok, nil}
end)
end
end

defmodule MaybeDeadlockedReactor do
@moduledoc false
use Reactor

for i <- 0..16 do
step :"sleep_#{i}" do
run(fn _, context ->
Reactor.run(
MaybeDeadlockedReactor.Inner,
%{},
%{},
concurrency_key: context.concurrency_key
)
end)
end
end
end

Reactor.run(MaybeDeadlockedReactor, %{}, %{}, max_concurrency: 8)
end

test "lots of reactors sharing a concurrency key do not deadlock" do
defmodule OversharingReactor do
@moduledoc false
use Reactor

step :sleep do
run fn _ ->
Process.sleep(100)
{:ok, nil}
end
end
end

pool = ConcurrencyTracker.allocate_pool(8)

0..16
|> Enum.map(fn _ ->
Task.async(fn ->
Reactor.run(OversharingReactor, %{}, %{}, concurrency_key: pool)
end)
end)
|> Enum.map(&Task.await/1)
end

test "Zach's hunch" do
defmodule GrandchildReactor do
@moduledoc false
use Reactor

step :sleep do
run fn _ ->
Process.sleep(1000)
{:ok, nil}
end
end
end

defmodule ChildReactor do
@moduledoc false
use Reactor

step :splode do
run fn _, context ->
0..16
|> Enum.map(fn _ ->
Task.async(fn ->
Reactor.run(GrandchildReactor, %{}, %{}, concurrency_key: context.concurrency_key)
end)
end)
|> Enum.map(&Task.await/1)

{:ok, nil}
end
end
end

defmodule ParentReactor do
@moduledoc false
use Reactor

step :splode do
run fn _ ->
pool = ConcurrencyTracker.allocate_pool(16)

0..16
|> Enum.map(fn _ ->
Task.async(fn -> Reactor.run(ChildReactor, %{}, %{}, concurrency_key: pool) end)
end)
|> Enum.map(&Task.await/1)

{:ok, nil}
end
end
end
end

Reactor.run(ParentReactor)
end
end

0 comments on commit 2d48eee

Please sign in to comment.