Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
chrismccord committed Aug 27, 2024
1 parent e6095aa commit 9c4a4fc
Showing 1 changed file with 15 additions and 22 deletions.
37 changes: 15 additions & 22 deletions lib/flame/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ defmodule FLAME.Pool do

runner_count < state.max ->
if state.async_boot_timer ||
map_size(state.pending_runners) * state.max_concurrency > Queue.size(state.waiting) do
map_size(state.pending_runners) * state.max_concurrency > waiting_count(state) do
waiting_in(state, deadline, from)
else
state
Expand Down Expand Up @@ -797,7 +797,7 @@ defmodule FLAME.Pool do
end

defp has_unmet_servicable_demand?(%Pool{} = state) do
waiting_count(state) > 0 and runner_count(state) < state.max
waiting_count(state) > map_size(state.pending_runners) * state.max_concurrency and runner_count(state) < state.max
end

defp handle_runner_async_up(%Pool{} = state, pid, ref) when is_pid(pid) and is_reference(ref) do
Expand All @@ -813,27 +813,20 @@ defmodule FLAME.Pool do
# 2. the case where we process a DOWN for the new runner as we pop DOWNs
# looking for fresh waiting
# 3. if we still have waiting callers at the end, boot more runners if we have capacity
new_state =
Enum.reduce_while(1..state.max_concurrency, new_state, fn i, acc ->
with {:ok, %RunnerState{} = runner} <- Map.fetch(acc.runners, runner.monitor_ref),
true <- i <= acc.max_concurrency do
case pop_next_waiting_caller(acc) do
{%WaitingState{} = next, acc} ->
{:cont, reply_runner_checkout(acc, runner, next.from, next.monitor_ref)}

{nil, acc} ->
{:halt, acc}
end
else
_ -> {:halt, acc}
Enum.reduce_while(1..state.max_concurrency, new_state, fn i, acc ->
with {:ok, %RunnerState{} = runner} <- Map.fetch(acc.runners, runner.monitor_ref),
true <- i <= acc.max_concurrency do
case pop_next_waiting_caller(acc) do
{%WaitingState{} = next, acc} ->
{:cont, reply_runner_checkout(acc, runner, next.from, next.monitor_ref)}

{nil, acc} ->
{:halt, acc}
end
end)

if has_unmet_servicable_demand?(new_state) do
async_boot_runner(new_state)
else
new_state
end
else
_ -> {:halt, acc}
end
end)
end

defp deadline(timeout) when is_integer(timeout) do
Expand Down

0 comments on commit 9c4a4fc

Please sign in to comment.