Skip to content

Commit

Permalink
Allow strategy implementations to implement has_unment_servicable_dem…
Browse files Browse the repository at this point in the history
…and?/2
  • Loading branch information
nickdichev-firework committed Aug 19, 2024
1 parent 6bb47ca commit 07e2185
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 11 deletions.
22 changes: 12 additions & 10 deletions lib/flame/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,15 @@ defmodule FLAME.Pool do
defp handle_down(%Pool{} = state, {:DOWN, ref, :process, pid, reason}) do
state = maybe_drop_waiting(state, pid)

%{
callers: callers,
runners: runners,
pending_runners: pending_runners,
strategy: {strategy_module, strategy_opts}
} = state

state =
case state.callers do
case callers do
%{^pid => %Caller{monitor_ref: ^ref} = caller} ->
drop_caller(state, pid, caller)

Expand All @@ -746,16 +753,16 @@ defmodule FLAME.Pool do
end

state =
case state.runners do
case runners do
%{^ref => _} -> drop_child_runner(state, ref)
%{} -> state
end

case state.pending_runners do
case pending_runners do
%{^ref => _} ->
state = %Pool{state | pending_runners: Map.delete(state.pending_runners, ref)}
# we rate limit this to avoid many failed async boot attempts
if has_unmet_servicable_demand?(state) do
if strategy_module.has_unmet_servicable_demand?(state, strategy_opts) do
state
|> maybe_on_grow_end(pid, {:exit, reason})
|> schedule_async_boot_runner()
Expand Down Expand Up @@ -787,11 +794,6 @@ defmodule FLAME.Pool do
state
end

defp has_unmet_servicable_demand?(%Pool{} = state) do
runner_count = runner_count(state) + pending_count(state)
waiting_count(state) > 0 and runner_count < state.max
end

defp handle_runner_async_up(%Pool{} = state, pid, ref) when is_pid(pid) and is_reference(ref) do
%{^ref => task_pid} = state.pending_runners
Process.demonitor(ref, [:flush])
Expand All @@ -811,7 +813,7 @@ defmodule FLAME.Pool do
strategy_module.assign_waiting_callers(new_state, runner, pop, checkout, strategy_opts)

# If we still have waiting callers, boot more runners if we have capacity
if has_unmet_servicable_demand?(new_state) do
if strategy_module.has_unmet_servicable_demand?(new_state, strategy_opts) do
async_boot_runner(new_state)
else
new_state
Expand Down
5 changes: 5 additions & 0 deletions lib/flame/pool/per_runner_max_concurrency_strategy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ defmodule FLAME.Pool.PerRunnerMaxConcurrencyStrategy do
Pool.runner_count(pool) + Pool.pending_count(pool) + 1
end

def has_unmet_servicable_demand?(%Pool{} = pool, _opts) do
runner_count = Pool.runner_count(pool) + Pool.pending_count(pool)
Pool.waiting_count(pool) > 0 and runner_count < pool.max
end

defp min_runner(pool) do
if map_size(pool.runners) == 0 do
nil
Expand Down
2 changes: 1 addition & 1 deletion lib/flame/pool/strategy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ defmodule FLAME.Pool.Strategy do
) ::
Pool.t()

@callback desired_count(state :: Pool.t(), opts :: Keyword.t()) :: non_neg_integer()
@callback has_unmet_servicable_demand?(state :: Pool.t(), opts :: Keyword.t()) :: boolean()
end

0 comments on commit 07e2185

Please sign in to comment.