Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding TTL to proxied messages #276

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 0.10.0

- Added optional TTL to Horde.DynamicSupervisor's `:proxy_operation` messages. The Time-to-Live defaults to :infinity for full backwards compatibility. This TTL helps prevent potential issues where messages could loop forever between a set of nodes which disagree on which node should execute the task.
- [BREAKING] Horde.DynamicSupervisor's new `:proxy_message_ttl` option configures the maximum TTL for proxy messages. It takes an integer denoting the maximum number of hops a message can travel, or the atom :infinity (default). This can be a breaking change: when upgrading do not set this option to an integer. You can explicity set it to :infinity or leave it default. If this is set to an integer, upgraded nodes won't be able to proxy to non-upgrade nodes.

## 0.9.0

- Bugfixes for scenarios causing Horde to crash. See [#266](https://github.com/derekkraan/horde/pull/266) and [#263](https://github.com/derekkraan/horde/pull/263).
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Daniel Azuma gave [a great talk](https://www.youtube.com/watch?v=nLApFANtkHs) at

Since Horde is built on CRDTs, it is eventually (as opposed to immediately) consistent, although it does sync its state with its neighbours rather aggressively. Cluster membership in Horde is fully dynamic; nodes can be added and removed at any time and Horde will continue to operate as expected. `Horde.DynamicSupervisor` also uses a hash ring to limit any possible race conditions to times when cluster membership is changing.

`Horde.Registry` is API-compatible with Elixir's own Registry, although it does not yet support the `keys: :duplicate` option. For many use cases, it will be a drop-in replacement. `Horde.DynamicSupervisor` follows the API and behaviour of `DynamicSupervisor` as closely as possible. There will always be some difference between Horde and its standard library equivalents, if not in their APIs, then in their functionality. This is a necessary consequence of Horde's distributed nature.
`Horde.Registry` is API-compatible with Elixir's own Registry, although it does not yet support the `keys: :duplicate` option. For many use cases, it will be a drop-in replacement. `Horde.DynamicSupervisor` follows the API and behaviour of `DynamicSupervisor` as closely as possible. There will always be some difference between Horde and its standard library equivalents, if not in their APIs, then in their functionality. This is a necessary consequence of Horde's distributed nature. See [documentation of Horde.DynamicSupervisor.start_link/1](https://hexdocs.pm/horde/Horde.DynamicSupervisor.html#start_link/1) for details.

## Running a single global process

Expand Down Expand Up @@ -100,6 +100,7 @@ Horde.Cluster.set_members(:distributed_supervisor_1, [:distributed_supervisor_1,
# supervisor_1, supervisor_2 and supervisor_3 will be joined in a single cluster.
```


# Other projects

Useful libraries that use or extend Horde functionalities.
Expand Down
12 changes: 12 additions & 0 deletions lib/horde/dynamic_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ defmodule Horde.DynamicSupervisor do
| {:max_seconds, integer()}
| {:extra_arguments, [term()]}
| {:distribution_strategy, Horde.DistributionStrategy.t()}
| {:proxy_message_ttl, integer() | :infinity}
| {:shutdown, integer()}
| {:members, [Horde.Cluster.member()] | :auto}
| {:delta_crdt_options, [DeltaCrdt.crdt_option()]}
Expand Down Expand Up @@ -109,6 +110,7 @@ defmodule Horde.DynamicSupervisor do
@doc """
Works like `DynamicSupervisor.start_link/1`. Extra options are documented here:
- `:distribution_strategy`, defaults to `Horde.UniformDistribution`, but more are available - see `Horde.DistributionStrategy`
- `:proxy_message_ttl`, defaults to `:infinity`. Can be set to an integer indicating the maximum number of times a message may be forwarded in a Horde.DynamicSupervisor cluster. Leaving it at infinity is genrally fine when using a stable distribution strategy such as `Horde.UniformDistribution`. Setting a TTL is helpful when migrating to a different distribution_strategy, or when using an algorithm with random distribution such as `Horde.UniformRandomDistribution`, as it will prevent messages from looping (near) infinitely.
"""
def start_link(options) when is_list(options) do
keys = [
Expand All @@ -119,6 +121,7 @@ defmodule Horde.DynamicSupervisor do
:strategy,
:distribution_strategy,
:process_redistribution,
:proxy_message_ttl,
:members,
:delta_crdt_options
]
Expand Down Expand Up @@ -156,13 +159,21 @@ defmodule Horde.DynamicSupervisor do
Horde.UniformDistribution
)

proxy_message_ttl =
Keyword.get(
options,
:proxy_message_ttl,
:infinity
)

flags = %{
strategy: strategy,
max_restarts: max_restarts,
max_seconds: max_seconds,
max_children: max_children,
extra_arguments: extra_arguments,
distribution_strategy: distribution_strategy,
proxy_message_ttl: proxy_message_ttl,
members: members,
delta_crdt_options: delta_crdt_options(delta_crdt_options),
process_redistribution: process_redistribution
Expand Down Expand Up @@ -196,6 +207,7 @@ defmodule Horde.DynamicSupervisor do
extra_arguments: flags.extra_arguments,
distribution_strategy: flags.distribution_strategy,
process_redistribution: flags.process_redistribution,
proxy_message_ttl: flags.proxy_message_ttl,
members: members(flags.members, name)
]},
{Horde.ProcessesSupervisor,
Expand Down
36 changes: 31 additions & 5 deletions lib/horde/dynamic_supervisor_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ defmodule Horde.DynamicSupervisorImpl do
name_to_supervisor_ref: %{},
shutting_down: false,
supervisor_options: [],
proxy_message_ttl: :infinity,
proxy_operation_ttl: nil,
distribution_strategy: Horde.UniformDistribution

def start_link(opts) do
Expand Down Expand Up @@ -50,7 +52,7 @@ defmodule Horde.DynamicSupervisorImpl do
process_pid_to_id: new_table(:process_pid_to_id),
name: name
}
|> Map.merge(Map.new(Keyword.take(options, [:distribution_strategy])))
|> Map.merge(Map.new(Keyword.take(options, [:distribution_strategy, :proxy_message_ttl])))

state = set_own_node_status(state)

Expand Down Expand Up @@ -145,15 +147,14 @@ defmodule Horde.DynamicSupervisorImpl do
def handle_call({:start_child, _child_spec}, _from, %{shutting_down: true} = state),
do: {:reply, {:error, {:shutting_down, "this node is shutting down."}}, state}

@big_number round(:math.pow(2, 128))

def handle_call({:start_child, child_spec} = msg, from, state) do
this_name = fully_qualified_name(state.name)
proxy_ttl_expired? = proxy_message_ttl(state, from) == 0

child_spec = randomize_child_id(child_spec)

case choose_node(child_spec, state) do
{:ok, %{name: ^this_name}} ->
{:ok, %{name: node_name}} when node_name == this_name or proxy_ttl_expired? ->
{reply, new_state} = add_child(child_spec, state)
{:reply, reply, new_state}

Expand Down Expand Up @@ -277,14 +278,27 @@ defmodule Horde.DynamicSupervisorImpl do
end
end

@big_number round(:math.pow(2, 128))

defp randomize_child_id(child) do
Map.put(child, :id, :rand.uniform(@big_number))
end

defp proxy_to_node(_node_name, message, reply_to, %{proxy_operation_ttl: {reply_to, 0}} = state) do
message_type = elem(message, 0)

{:reply,
{:error, :proxy_operation_ttl_expired,
"a proxied #{message_type} message was discard because its TTL expired"}, state}
end

defp proxy_to_node(node_name, message, reply_to, state) do
case Map.get(members(state), node_name) do
%{status: :alive} ->
send(node_name, {:proxy_operation, message, reply_to})
case(proxy_message_ttl(state, reply_to)) do
:infinity -> send(node_name, {:proxy_operation, message, reply_to})
ttl -> send(node_name, {:proxy_operation, message, reply_to, ttl})
end
{:noreply, state}

_ ->
Expand All @@ -296,6 +310,12 @@ defmodule Horde.DynamicSupervisorImpl do
end
end

defp proxy_message_ttl(%{proxy_operation_ttl: {reply_to, ttl}} = _state, reply_to), do: ttl
defp proxy_message_ttl(%{proxy_message_ttl: ttl} = _state, _reply_to), do: ttl

defp decrement_ttl(:infinity), do: :infinity
defp decrement_ttl(n) when is_integer(n), do: n - 1

defp set_own_node_status(state, force \\ false)

defp set_own_node_status(state, false) do
Expand Down Expand Up @@ -336,6 +356,12 @@ defmodule Horde.DynamicSupervisorImpl do
end

def handle_info({:proxy_operation, msg, reply_to}, state) do
handle_info({:proxy_operation, msg, reply_to, :infinity}, state)
end

def handle_info({:proxy_operation, msg, reply_to, ttl}, state) do
state = %{state | proxy_operation_ttl: {reply_to, decrement_ttl(ttl)}}

case handle_call(msg, reply_to, state) do
{:reply, reply, new_state} ->
GenServer.reply(reply_to, reply)
Expand Down
149 changes: 146 additions & 3 deletions test/dynamic_supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule DynamicSupervisorTest do
require Logger
use ExUnit.Case

defp do_setup() do
defp do_setup(_context) do
n1 = :"horde_#{:rand.uniform(100_000_000)}"
n2 = :"horde_#{:rand.uniform(100_000_000)}"
n3 = :"horde_#{:rand.uniform(100_000_000)}"
Expand Down Expand Up @@ -143,16 +143,64 @@ defmodule DynamicSupervisorTest do
]
end

setup %{describe: describe} do
defp proxy_ttl_spec(name, redist, %{distribution_strategy: dist_strat, proxy_message_ttl: ttl}) do
%{
id: name,
start: {
Horde.DynamicSupervisor,
:start_link,
[
[
name: name,
strategy: :one_for_one,
process_redistribution: redist,
delta_crdt_options: [sync_interval: 20],
distribution_strategy: dist_strat,
proxy_message_ttl: ttl
]
]
},
restart: :transient
}
end

defp proxy_ttl_setup(context) do
n1 = :horde_proxy_ttl_1
n2 = :horde_proxy_ttl_2
n3 = :horde_proxy_ttl_3

members = [{n1, Node.self()}, {n2, Node.self()}, {n3, Node.self()}]

# Spawn one supervisor and add processes to it, then spawn another and redistribute
# the processes between the two.
{:ok, _pid_n1} = start_supervised(proxy_ttl_spec(n1, :active, context))
{:ok, _pid_n2} = start_supervised(proxy_ttl_spec(n2, :active, context))
{:ok, _pid_n3} = start_supervised(proxy_ttl_spec(n3, :active, context))

Horde.Cluster.set_members(n1, [n1, n2, n3])

[
n1: n1,
n2: n2,
n3: n3,
names: [n1, n2, n3],
members: members
]
end

setup %{describe: describe} = context do
case describe do
"redistribute" ->
redistribute_setup()

"graceful shutdown" ->
Logger.info("Skip setup for \"#{describe}\" context")

"proxy ttl" ->
proxy_ttl_setup(context)

_ ->
do_setup()
do_setup(context)
end
end

Expand Down Expand Up @@ -635,4 +683,99 @@ defmodule DynamicSupervisorTest do
)
end
end

defp wait_until_cluster_synced(names) do
Stream.iterate(false, fn _ ->
Process.sleep(100)

1 ==
names
|> Enum.map(&Horde.Cluster.members(&1))
|> Enum.uniq()
|> length
end)
|> Enum.find(&(&1 == true))
end

defmodule HotPotatoDistribution do
@behaviour Horde.DistributionStrategy

@moduledoc """
Distributes processes to any alive node except itself
"""

def choose_node(_child_spec, members) do
members
|> Enum.filter(&match?(%{status: :alive}, &1))
|> Enum.find(fn %{name: {name, _node}} -> Process.whereis(name) != self() end)
|> case do
nil ->
{:error, :no_other_nodes_alive}

member ->
{:ok, member}
end
end

def has_quorum?(_members), do: true
end

defp async_simple_task(name) do
exunit = self()
task_spec = Task.child_spec(fn -> send(exunit, "child alive") end)

Task.async(fn ->
Horde.DynamicSupervisor.start_child(name, task_spec)
end)
end

describe "proxy ttl" do
@tag proxy_message_ttl: :infinity
@tag distribution_strategy: HotPotatoDistribution
test "message will proxy forever if nodes disagree on member", context do
wait_until_cluster_synced(context.names)

assert %Task{} = task = async_simple_task(context.n1)

refute_receive "child alive", 200
assert nil == Task.shutdown(task)
end

@tag proxy_message_ttl: 10
@tag distribution_strategy: HotPotatoDistribution
test "start_child will add child on any node when proxy TTL expires", context do
wait_until_cluster_synced(context.names)

assert {:ok, {:ok, _pid}} = Task.yield(async_simple_task(context.n1))
assert_receive "child alive", 200
end

@tag proxy_message_ttl: 1
@tag distribution_strategy: Horde.UniformDistribution
test "message will be picked up if nodes agree on member", context do
wait_until_cluster_synced(context.names)

assert {:ok, {:ok, _pid}} = Task.yield(async_simple_task(context.n1))
assert_receive "child alive", 200
end

@tag proxy_message_ttl: 1
@tag distribution_strategy: Horde.UniformRandomDistribution
test "random distribution start_child with TTL of 1 will not bounce around", context do
wait_until_cluster_synced(context.names)

process_count = 200

result_count =
Enum.map(1..process_count, fn _ -> async_simple_task(context.n1) end)
|> Task.yield_many(1_000)
|> Enum.frequencies_by(fn {_task, {:ok, outcome}} -> elem(outcome, 0) end)

Enum.each(1..process_count, fn _n ->
assert_receive "child alive", 200
end)

assert %{ok: process_count} == result_count
end
end
end