diff --git a/CHANGELOG.md b/CHANGELOG.md index 2288f61..e4b2352 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 0.10.0 + +- Added optional TTL to Horde.DynamicSupervisor's `:proxy_operation` messages. The Time-to-Live defaults to :infinity for full backwards compatibility. This TTL helps prevent potential issues where messages could loop forever between a set of nodes which disagree on which node should execute the task. +- [BREAKING] Horde.DynamicSupervisor's new `:proxy_message_ttl` option configures the maximum TTL for proxy messages. It takes an integer denoting the maximum number of hops a message can travel, or the atom :infinity (default). This can be a breaking change: when upgrading do not set this option to an integer. You can explicity set it to :infinity or leave it default. If this is set to an integer, upgraded nodes won't be able to proxy to non-upgrade nodes. + ## 0.9.0 - Bugfixes for scenarios causing Horde to crash. See [#266](https://github.com/derekkraan/horde/pull/266) and [#263](https://github.com/derekkraan/horde/pull/263). diff --git a/README.md b/README.md index 194de65..d46e29d 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ Daniel Azuma gave [a great talk](https://www.youtube.com/watch?v=nLApFANtkHs) at Since Horde is built on CRDTs, it is eventually (as opposed to immediately) consistent, although it does sync its state with its neighbours rather aggressively. Cluster membership in Horde is fully dynamic; nodes can be added and removed at any time and Horde will continue to operate as expected. `Horde.DynamicSupervisor` also uses a hash ring to limit any possible race conditions to times when cluster membership is changing. -`Horde.Registry` is API-compatible with Elixir's own Registry, although it does not yet support the `keys: :duplicate` option. For many use cases, it will be a drop-in replacement. `Horde.DynamicSupervisor` follows the API and behaviour of `DynamicSupervisor` as closely as possible. There will always be some difference between Horde and its standard library equivalents, if not in their APIs, then in their functionality. This is a necessary consequence of Horde's distributed nature. +`Horde.Registry` is API-compatible with Elixir's own Registry, although it does not yet support the `keys: :duplicate` option. For many use cases, it will be a drop-in replacement. `Horde.DynamicSupervisor` follows the API and behaviour of `DynamicSupervisor` as closely as possible. There will always be some difference between Horde and its standard library equivalents, if not in their APIs, then in their functionality. This is a necessary consequence of Horde's distributed nature. See [documentation of Horde.DynamicSupervisor.start_link/1](https://hexdocs.pm/horde/Horde.DynamicSupervisor.html#start_link/1) for details. ## Running a single global process @@ -100,6 +100,7 @@ Horde.Cluster.set_members(:distributed_supervisor_1, [:distributed_supervisor_1, # supervisor_1, supervisor_2 and supervisor_3 will be joined in a single cluster. ``` + # Other projects Useful libraries that use or extend Horde functionalities. diff --git a/lib/horde/dynamic_supervisor.ex b/lib/horde/dynamic_supervisor.ex index 534b871..4fb9964 100644 --- a/lib/horde/dynamic_supervisor.ex +++ b/lib/horde/dynamic_supervisor.ex @@ -60,6 +60,7 @@ defmodule Horde.DynamicSupervisor do | {:max_seconds, integer()} | {:extra_arguments, [term()]} | {:distribution_strategy, Horde.DistributionStrategy.t()} + | {:proxy_message_ttl, integer() | :infinity} | {:shutdown, integer()} | {:members, [Horde.Cluster.member()] | :auto} | {:delta_crdt_options, [DeltaCrdt.crdt_option()]} @@ -109,6 +110,7 @@ defmodule Horde.DynamicSupervisor do @doc """ Works like `DynamicSupervisor.start_link/1`. Extra options are documented here: - `:distribution_strategy`, defaults to `Horde.UniformDistribution`, but more are available - see `Horde.DistributionStrategy` + - `:proxy_message_ttl`, defaults to `:infinity`. Can be set to an integer indicating the maximum number of times a message may be forwarded in a Horde.DynamicSupervisor cluster. Leaving it at infinity is genrally fine when using a stable distribution strategy such as `Horde.UniformDistribution`. Setting a TTL is helpful when migrating to a different distribution_strategy, or when using an algorithm with random distribution such as `Horde.UniformRandomDistribution`, as it will prevent messages from looping (near) infinitely. """ def start_link(options) when is_list(options) do keys = [ @@ -119,6 +121,7 @@ defmodule Horde.DynamicSupervisor do :strategy, :distribution_strategy, :process_redistribution, + :proxy_message_ttl, :members, :delta_crdt_options ] @@ -156,6 +159,13 @@ defmodule Horde.DynamicSupervisor do Horde.UniformDistribution ) + proxy_message_ttl = + Keyword.get( + options, + :proxy_message_ttl, + :infinity + ) + flags = %{ strategy: strategy, max_restarts: max_restarts, @@ -163,6 +173,7 @@ defmodule Horde.DynamicSupervisor do max_children: max_children, extra_arguments: extra_arguments, distribution_strategy: distribution_strategy, + proxy_message_ttl: proxy_message_ttl, members: members, delta_crdt_options: delta_crdt_options(delta_crdt_options), process_redistribution: process_redistribution @@ -196,6 +207,7 @@ defmodule Horde.DynamicSupervisor do extra_arguments: flags.extra_arguments, distribution_strategy: flags.distribution_strategy, process_redistribution: flags.process_redistribution, + proxy_message_ttl: flags.proxy_message_ttl, members: members(flags.members, name) ]}, {Horde.ProcessesSupervisor, diff --git a/lib/horde/dynamic_supervisor_impl.ex b/lib/horde/dynamic_supervisor_impl.ex index ec1ed6a..33de554 100644 --- a/lib/horde/dynamic_supervisor_impl.ex +++ b/lib/horde/dynamic_supervisor_impl.ex @@ -22,6 +22,8 @@ defmodule Horde.DynamicSupervisorImpl do name_to_supervisor_ref: %{}, shutting_down: false, supervisor_options: [], + proxy_message_ttl: :infinity, + proxy_operation_ttl: nil, distribution_strategy: Horde.UniformDistribution def start_link(opts) do @@ -50,7 +52,7 @@ defmodule Horde.DynamicSupervisorImpl do process_pid_to_id: new_table(:process_pid_to_id), name: name } - |> Map.merge(Map.new(Keyword.take(options, [:distribution_strategy]))) + |> Map.merge(Map.new(Keyword.take(options, [:distribution_strategy, :proxy_message_ttl]))) state = set_own_node_status(state) @@ -145,15 +147,14 @@ defmodule Horde.DynamicSupervisorImpl do def handle_call({:start_child, _child_spec}, _from, %{shutting_down: true} = state), do: {:reply, {:error, {:shutting_down, "this node is shutting down."}}, state} - @big_number round(:math.pow(2, 128)) - def handle_call({:start_child, child_spec} = msg, from, state) do this_name = fully_qualified_name(state.name) + proxy_ttl_expired? = proxy_message_ttl(state, from) == 0 child_spec = randomize_child_id(child_spec) case choose_node(child_spec, state) do - {:ok, %{name: ^this_name}} -> + {:ok, %{name: node_name}} when node_name == this_name or proxy_ttl_expired? -> {reply, new_state} = add_child(child_spec, state) {:reply, reply, new_state} @@ -277,14 +278,27 @@ defmodule Horde.DynamicSupervisorImpl do end end + @big_number round(:math.pow(2, 128)) + defp randomize_child_id(child) do Map.put(child, :id, :rand.uniform(@big_number)) end + defp proxy_to_node(_node_name, message, reply_to, %{proxy_operation_ttl: {reply_to, 0}} = state) do + message_type = elem(message, 0) + + {:reply, + {:error, :proxy_operation_ttl_expired, + "a proxied #{message_type} message was discard because its TTL expired"}, state} + end + defp proxy_to_node(node_name, message, reply_to, state) do case Map.get(members(state), node_name) do %{status: :alive} -> - send(node_name, {:proxy_operation, message, reply_to}) + case(proxy_message_ttl(state, reply_to)) do + :infinity -> send(node_name, {:proxy_operation, message, reply_to}) + ttl -> send(node_name, {:proxy_operation, message, reply_to, ttl}) + end {:noreply, state} _ -> @@ -296,6 +310,12 @@ defmodule Horde.DynamicSupervisorImpl do end end + defp proxy_message_ttl(%{proxy_operation_ttl: {reply_to, ttl}} = _state, reply_to), do: ttl + defp proxy_message_ttl(%{proxy_message_ttl: ttl} = _state, _reply_to), do: ttl + + defp decrement_ttl(:infinity), do: :infinity + defp decrement_ttl(n) when is_integer(n), do: n - 1 + defp set_own_node_status(state, force \\ false) defp set_own_node_status(state, false) do @@ -336,6 +356,12 @@ defmodule Horde.DynamicSupervisorImpl do end def handle_info({:proxy_operation, msg, reply_to}, state) do + handle_info({:proxy_operation, msg, reply_to, :infinity}, state) + end + + def handle_info({:proxy_operation, msg, reply_to, ttl}, state) do + state = %{state | proxy_operation_ttl: {reply_to, decrement_ttl(ttl)}} + case handle_call(msg, reply_to, state) do {:reply, reply, new_state} -> GenServer.reply(reply_to, reply) diff --git a/test/dynamic_supervisor_test.exs b/test/dynamic_supervisor_test.exs index dcbeff9..2c252f8 100644 --- a/test/dynamic_supervisor_test.exs +++ b/test/dynamic_supervisor_test.exs @@ -2,7 +2,7 @@ defmodule DynamicSupervisorTest do require Logger use ExUnit.Case - defp do_setup() do + defp do_setup(_context) do n1 = :"horde_#{:rand.uniform(100_000_000)}" n2 = :"horde_#{:rand.uniform(100_000_000)}" n3 = :"horde_#{:rand.uniform(100_000_000)}" @@ -143,7 +143,52 @@ defmodule DynamicSupervisorTest do ] end - setup %{describe: describe} do + defp proxy_ttl_spec(name, redist, %{distribution_strategy: dist_strat, proxy_message_ttl: ttl}) do + %{ + id: name, + start: { + Horde.DynamicSupervisor, + :start_link, + [ + [ + name: name, + strategy: :one_for_one, + process_redistribution: redist, + delta_crdt_options: [sync_interval: 20], + distribution_strategy: dist_strat, + proxy_message_ttl: ttl + ] + ] + }, + restart: :transient + } + end + + defp proxy_ttl_setup(context) do + n1 = :horde_proxy_ttl_1 + n2 = :horde_proxy_ttl_2 + n3 = :horde_proxy_ttl_3 + + members = [{n1, Node.self()}, {n2, Node.self()}, {n3, Node.self()}] + + # Spawn one supervisor and add processes to it, then spawn another and redistribute + # the processes between the two. + {:ok, _pid_n1} = start_supervised(proxy_ttl_spec(n1, :active, context)) + {:ok, _pid_n2} = start_supervised(proxy_ttl_spec(n2, :active, context)) + {:ok, _pid_n3} = start_supervised(proxy_ttl_spec(n3, :active, context)) + + Horde.Cluster.set_members(n1, [n1, n2, n3]) + + [ + n1: n1, + n2: n2, + n3: n3, + names: [n1, n2, n3], + members: members + ] + end + + setup %{describe: describe} = context do case describe do "redistribute" -> redistribute_setup() @@ -151,8 +196,11 @@ defmodule DynamicSupervisorTest do "graceful shutdown" -> Logger.info("Skip setup for \"#{describe}\" context") + "proxy ttl" -> + proxy_ttl_setup(context) + _ -> - do_setup() + do_setup(context) end end @@ -635,4 +683,99 @@ defmodule DynamicSupervisorTest do ) end end + + defp wait_until_cluster_synced(names) do + Stream.iterate(false, fn _ -> + Process.sleep(100) + + 1 == + names + |> Enum.map(&Horde.Cluster.members(&1)) + |> Enum.uniq() + |> length + end) + |> Enum.find(&(&1 == true)) + end + + defmodule HotPotatoDistribution do + @behaviour Horde.DistributionStrategy + + @moduledoc """ + Distributes processes to any alive node except itself + """ + + def choose_node(_child_spec, members) do + members + |> Enum.filter(&match?(%{status: :alive}, &1)) + |> Enum.find(fn %{name: {name, _node}} -> Process.whereis(name) != self() end) + |> case do + nil -> + {:error, :no_other_nodes_alive} + + member -> + {:ok, member} + end + end + + def has_quorum?(_members), do: true + end + + defp async_simple_task(name) do + exunit = self() + task_spec = Task.child_spec(fn -> send(exunit, "child alive") end) + + Task.async(fn -> + Horde.DynamicSupervisor.start_child(name, task_spec) + end) + end + + describe "proxy ttl" do + @tag proxy_message_ttl: :infinity + @tag distribution_strategy: HotPotatoDistribution + test "message will proxy forever if nodes disagree on member", context do + wait_until_cluster_synced(context.names) + + assert %Task{} = task = async_simple_task(context.n1) + + refute_receive "child alive", 200 + assert nil == Task.shutdown(task) + end + + @tag proxy_message_ttl: 10 + @tag distribution_strategy: HotPotatoDistribution + test "start_child will add child on any node when proxy TTL expires", context do + wait_until_cluster_synced(context.names) + + assert {:ok, {:ok, _pid}} = Task.yield(async_simple_task(context.n1)) + assert_receive "child alive", 200 + end + + @tag proxy_message_ttl: 1 + @tag distribution_strategy: Horde.UniformDistribution + test "message will be picked up if nodes agree on member", context do + wait_until_cluster_synced(context.names) + + assert {:ok, {:ok, _pid}} = Task.yield(async_simple_task(context.n1)) + assert_receive "child alive", 200 + end + + @tag proxy_message_ttl: 1 + @tag distribution_strategy: Horde.UniformRandomDistribution + test "random distribution start_child with TTL of 1 will not bounce around", context do + wait_until_cluster_synced(context.names) + + process_count = 200 + + result_count = + Enum.map(1..process_count, fn _ -> async_simple_task(context.n1) end) + |> Task.yield_many(1_000) + |> Enum.frequencies_by(fn {_task, {:ok, outcome}} -> elem(outcome, 0) end) + + Enum.each(1..process_count, fn _n -> + assert_receive "child alive", 200 + end) + + assert %{ok: process_count} == result_count + end + end end