Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't deadlock when lots of async reactors are sharing a concurrency pool. #36

Merged
merged 1 commit into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions lib/reactor/executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ defmodule Reactor.Executor do
{:continue, ready_steps} <- find_ready_steps(reactor),
{:continue, reactor, state} <- start_ready_async_steps(reactor, state, ready_steps),
{:continue, reactor, state} <- run_ready_sync_step(reactor, state, ready_steps),
{:continue, reactor, state} <- maybe_run_any_step_sync(reactor, state, ready_steps),
{:continue, reactor} <- all_done(reactor) do
execute(reactor, subtract_iteration(state))
else
Expand Down Expand Up @@ -132,10 +133,6 @@ defmodule Reactor.Executor do

defp start_ready_async_steps(reactor, state, []), do: {:continue, reactor, state}

defp start_ready_async_steps(reactor, state, _steps)
when map_size(state.current_tasks) == state.max_concurrency,
do: {:continue, reactor, state}

defp start_ready_async_steps(reactor, state, steps) do
steps = Enum.filter(steps, &(&1.async? == true))

Expand All @@ -154,6 +151,53 @@ defmodule Reactor.Executor do
Executor.Sync.run(reactor, state, step)
end

# This seems a little unintuitive, but this is what allows reactors who are
# sharing a concurrency pool to move forward even then there's no concurrency
# left without deadlocking.
#
# It's a complicated scenario, so let's lay out the pieces:
#
# 1. When a new reactor is started it allocates a concurrency pool using
# `Reactor.Executor.ConcurrencyTracker` **unless** it is explicitly passed
# a `concurrency_key` option.
# 2. Every time a reactor runs an async step it starts a `Task` and consumes a
# space in the concurrency pool (if possible).
# 3. Every task that is started has it's concurrency key stored in it's
# process dictionary (actually a stack of them because we may be multiple
# nested reactors deep).
# 4. If that async step then turns around and runs a new reactor with shared
# concurrency then that reactor is already consuming a concurrency slot and
# may not be able to allocate any more slots for it's tasks.
#
# This situation can lead to a deadlock where we have multiple reactors all in
# a tight loop trying to start tasks but none of them able to proceed.
#
# We detect this situation by:
#
# 1. We are unable to start any async steps (`start_ready_async_steps/3`
# returns `:continue`).
# 2. We are unable to start any sync steps (`run_ready_sync_step/3` returns
# `:continue`).
# 3. We have any steps which can be run (ie async ones which we couldn't
# start).
# 4. Our concurrency key is in the process dictionary.
#
# If all four of these conditions are met we pick the first step and run it
# synchronously. This is fine because the reactor process itself is a task in
# another reactor so in effect is still running asynchronously.
defp maybe_run_any_step_sync(reactor, state, []), do: {:continue, reactor, state}

defp maybe_run_any_step_sync(reactor, state, [step | _]) do
:__reactor__
|> Process.get([])
|> Enum.any?(&(&1.concurrency_key == state.concurrency_key))
|> if do
Executor.Sync.run(reactor, state, step)
else
{:continue, reactor, state}
end
end

defp subtract_iteration(state) when state.max_iterations == :infinity, do: state

defp subtract_iteration(state) when state.max_iterations > 0,
Expand Down
27 changes: 9 additions & 18 deletions lib/reactor/executor/async.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@ defmodule Reactor.Executor.Async do
def start_steps(reactor, state, [], _supervisor), do: {:continue, reactor, state}

def start_steps(reactor, state, steps, supervisor) do
available_concurrency = state.max_concurrency - map_size(state.current_tasks)
available_steps = length(steps)

start_steps(reactor, state, steps, supervisor, available_concurrency)
end

defp start_steps(reactor, state, _steps, _supervisor, 0), do: {:continue, reactor, state}
locked_concurrency =
acquire_concurrency_resource_from_pool(state.concurrency_key, available_steps)

defp start_steps(reactor, state, steps, supervisor, available_concurrency) do
started =
steps
|> Enum.take(available_concurrency)
|> Enum.take_while(&acquire_concurrency_resource_from_pool(state.concurrency_key, &1))
|> Enum.take(locked_concurrency)
|> Enum.reduce_while(%{}, fn step, started ->
case start_task_for_step(reactor, state, step, supervisor, state.concurrency_key) do
{:ok, task} -> {:cont, Map.put(started, task, step)}
Expand Down Expand Up @@ -362,17 +358,12 @@ defmodule Reactor.Executor.Async do
%{reactor | steps: Enum.concat(steps, reactor.steps)}
end

defp release_concurrency_resources_to_pool(_pool_key, 0), do: :ok

defp release_concurrency_resources_to_pool(pool_key, n) when n > 0 do
ConcurrencyTracker.release(pool_key)
release_concurrency_resources_to_pool(pool_key, n - 1)
defp release_concurrency_resources_to_pool(pool_key, how_many) do
ConcurrencyTracker.release(pool_key, how_many)
end

defp acquire_concurrency_resource_from_pool(pool_key, _) do
case ConcurrencyTracker.acquire(pool_key) do
:ok -> true
:error -> false
end
defp acquire_concurrency_resource_from_pool(pool_key, requested) do
{:ok, actual} = ConcurrencyTracker.acquire(pool_key, requested)
actual
end
end
84 changes: 60 additions & 24 deletions lib/reactor/executor/concurrency_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,20 @@ defmodule Reactor.Executor.ConcurrencyTracker do
This avoids nested Reactors spawning too many workers and thrashing the
system.
The process calling `allocate_pool/1` is monitored, and when it terminates
it's allocation is removed. Any processes which are using that pool will
not be able to allocate any new resources.
"""

use GenServer

@type pool_key :: reference()

@type record ::
{pool_key, concurrency_limit :: pos_integer, available_slots :: non_neg_integer(),
allocator :: pid}

@doc false
@spec start_link(any) :: GenServer.on_start()
def start_link(_), do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
Expand Down Expand Up @@ -65,35 +73,63 @@ defmodule Reactor.Executor.ConcurrencyTracker do
end

@doc """
Release a concurrency allocation back to the pool.
Release concurrency allocation back to the pool.
"""
@spec release(pool_key) :: :ok
def release(key) do
:ets.select_replace(__MODULE__, [
{{:"$1", :"$2", :"$3", :"$4"},
[{:andalso, {:"=<", {:+, :"$2", 1}, :"$3"}, {:==, :"$1", key}}],
[{{:"$1", {:+, :"$2", 1}, :"$3", :"$4"}}]}
])

:ok
@spec release(pool_key, how_many :: pos_integer) :: :ok | :error
def release(key, how_many \\ 1) do
# generated using:
#
# :ets.fun2ms(fn {key, concurrency_limit, concurrency_available, owner}
# when key == :key and concurrency_available + 1 <= concurrency_limit ->
# {key, concurrency_limit, concurrency_available + 1, owner}
# end)
#
# and replacing `:key` with the provided key.

Enum.reduce_while(1..how_many, :ok, fn _, :ok ->
case :ets.select_replace(__MODULE__, [
{{:"$1", :"$2", :"$3", :"$4"},
[{:andalso, {:==, :"$1", key}, {:"=<", {:+, :"$3", 1}, :"$2"}}],
[{{:"$1", :"$2", {:+, :"$3", 1}, :"$4"}}]}
]) do
0 -> {:halt, :error}
1 -> {:cont, :ok}
end
end)
end

@doc """
Attempt to acquire a concurrency allocation from the pool.
Attempt to acquire a number of concurrency allocations from the pool.
Returns `:ok` if the allocation was successful, otherwise `:error`.
Returns `{:ok, n}` where `n` was the number of slots that were actually
allocated. It's important to note that whilst you may request `16` slots, if
there is only `3` available, then this function will return `{:ok, 3}` and you
must abide by it.
It is possible for this function to return `{:ok, 0}` if there is no slots
available.
"""
@spec acquire(pool_key) :: :ok | :error
def acquire(key) do
__MODULE__
|> :ets.select_replace([
{{:"$1", :"$2", :"$3", :"$4"}, [{:andalso, {:>=, {:-, :"$2", 1}, 0}, {:==, :"$1", key}}],
[{{:"$1", {:-, :"$2", 1}, :"$3", :"$4"}}]}
])
|> case do
0 -> :error
1 -> :ok
end
@spec acquire(pool_key, how_many :: pos_integer()) :: {:ok, non_neg_integer()}
def acquire(key, how_many \\ 1) do
# generated using:
#
# :ets.fun2ms(fn {key, concurrency_limit, concurrency_available, owner}
# when key == :key and concurrency_available - 1 >= 0 ->
# {key, concurrency_limit, concurrency_available - 1, owner}
# end)
#
# and replacing `:key` with the provided key.

Enum.reduce_while(1..how_many, {:ok, 0}, fn _, {:ok, n} ->
case :ets.select_replace(__MODULE__, [
{{:"$1", :"$2", :"$3", :"$4"},
[{:andalso, {:==, :"$1", key}, {:>=, {:-, :"$3", 1}, 0}}],
[{{:"$1", :"$2", {:-, :"$3", 1}, :"$4"}}]}
]) do
0 -> {:halt, {:ok, n}}
1 -> {:cont, {:ok, n + 1}}
end
end)
end

@doc """
Expand All @@ -105,7 +141,7 @@ defmodule Reactor.Executor.ConcurrencyTracker do
__MODULE__
|> :ets.lookup(key)
|> case do
[{_, available, limit, _}] -> {:ok, available, limit}
[{_, limit, available, _}] -> {:ok, available, limit}
[] -> {:error, "Unknown concurrency pool"}
end
end
Expand Down
14 changes: 13 additions & 1 deletion lib/reactor/executor/step_runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,20 @@ defmodule Reactor.Executor.StepRunner do
{module, options} <- module_and_opts(step),
{:ok, context} <- build_context(reactor, state, step, concurrency_key),
{:ok, arguments} <- maybe_replace_arguments(arguments, context) do
do_run(module, options, arguments, context)
metadata = %{
current_step: step,
pid: self(),
reactor: reactor,
concurrency_key: concurrency_key
}

metadata_stack = Process.get(:__reactor__, [])
Process.put(:__reactor__, [metadata | metadata_stack])
result = do_run(module, options, arguments, context)
Process.put(:__reactor__, metadata_stack)
result
end
after
end

@doc """
Expand Down
7 changes: 0 additions & 7 deletions test/reactor/executor/async_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ defmodule Reactor.Executor.AsyncTest do
assert {:continue, ^reactor, ^state} = start_steps(reactor, state, [])
end

test "when there is no available concurrency slots, it tells the reactor to continue",
%{reactor: reactor, state: state, doable: doable} do
state = %{state | max_concurrency: 1, current_tasks: %{nil => nil}}

assert {:continue, ^reactor, ^state} = start_steps(reactor, state, [doable])
end

test "when steps are started, it stores them in the state",
%{reactor: reactor, state: state, doable: doable, supervisor: supervisor} do
assert {_, _reactor, state} = start_steps(reactor, state, [doable], supervisor)
Expand Down
12 changes: 6 additions & 6 deletions test/reactor/executor/concurrency_tracker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,26 @@ defmodule Reactor.Executor.ConcurrencyTrackerTest do
describe "acquire/1" do
test "when there is available concurrency in the pool, it returns ok" do
pool = allocate_pool(16)
assert :ok = acquire(pool)
assert {:ok, 1} = acquire(pool)
assert {:ok, 15, 16} = status(pool)
end

test "when there is no available concurrency in the pool, it returns error" do
test "when there is no available concurrency in the pool, it returns zero" do
pool = allocate_pool(0)
assert :error = acquire(pool)
assert {:ok, 0} = acquire(pool)
end

test "when there is 1 slot left, it can be acquired" do
pool = allocate_pool(1)
assert :ok = acquire(pool)
assert {:ok, 1} = acquire(pool)
assert {:ok, 0, 1} = status(pool)
end
end

describe "release/1" do
test "it increments the available concurrency in the pool when possible" do
pool = allocate_pool(16)
:ok = acquire(pool)
{:ok, 1} = acquire(pool)
assert {:ok, 15, 16} = status(pool)
assert :ok = release(pool)
assert {:ok, 16, 16} = status(pool)
Expand All @@ -57,7 +57,7 @@ defmodule Reactor.Executor.ConcurrencyTrackerTest do
test "it doesn't allow the pool to grow" do
pool = allocate_pool(16)
assert {:ok, 16, 16} = status(pool)
assert :ok = release(pool)
assert :error = release(pool)
assert {:ok, 16, 16} = status(pool)
end
end
Expand Down
Loading