diff --git a/lib/phoenix/pubsub/pg2.ex b/lib/phoenix/pubsub/pg2.ex index 83907490..801fefd6 100644 --- a/lib/phoenix/pubsub/pg2.ex +++ b/lib/phoenix/pubsub/pg2.ex @@ -62,20 +62,16 @@ defmodule Phoenix.PubSub.PG2 do name = Keyword.fetch!(opts, :name) pool_size = Keyword.get(opts, :pool_size, 1) adapter_name = Keyword.fetch!(opts, :adapter_name) - Supervisor.start_link(__MODULE__, {name, adapter_name, pool_size}, name: :"#{adapter_name}_supervisor") + Supervisor.start_link(__MODULE__, {name, adapter_name, pool_size}, name: adapter_name) end @impl true def init({name, adapter_name, pool_size}) do - [_ | groups] = + groups = for number <- 1..pool_size do :"#{adapter_name}_#{number}" end - # Use `adapter_name` for the first in the pool for backwards compatability - # with v2.0 when the pool_size is 1. - groups = [adapter_name | groups] - :persistent_term.put(adapter_name, List.to_tuple(groups)) children = diff --git a/lib/phoenix/tracker.ex b/lib/phoenix/tracker.ex index 269da817..d276115b 100644 --- a/lib/phoenix/tracker.ex +++ b/lib/phoenix/tracker.ex @@ -210,6 +210,13 @@ defmodule Phoenix.Tracker do |> Phoenix.Tracker.Shard.list(topic) end + @spec dirty_list(atom, topic) :: [presence] + def dirty_list(tracker_name, topic) do + tracker_name + |> Shard.name_for_topic(topic, pool_size(tracker_name)) + |> Phoenix.Tracker.Shard.dirty_list(topic) + end + @doc """ Gets presences tracked under a given topic and key pair. @@ -231,6 +238,35 @@ defmodule Phoenix.Tracker do |> Phoenix.Tracker.Shard.get_by_key(topic, key) end + @spec dirty_get_by_key(atom, topic, term) :: [presence] + def dirty_get_by_key(tracker_name, topic, key) do + tracker_name + |> Shard.name_for_topic(topic, pool_size(tracker_name)) + |> Phoenix.Tracker.Shard.dirty_get_by_key(topic, key) + end + + @spec dirty_get_by_key(atom, term) :: [{topic, pid, meta :: map()}] + def dirty_get_by_key(tracker_name, key) do + 0..(pool_size(tracker_name) - 1) + |> Enum.flat_map(fn n -> + shard_name = Shard.name_for_number(tracker_name, n) + + Phoenix.Tracker.Shard.dirty_get_by_key(shard_name, key) + end) + end + + @spec dirty_get_by_key_with_limit(atom, term, integer()) :: [{topic, pid, meta :: map()}] + def dirty_get_by_key_with_limit(tracker_name, key, limit) do + upper_bound_limit_per_shard = ceil(limit / pool_size(tracker_name)) + 0..(pool_size(tracker_name) - 1) + |> Enum.flat_map(fn n -> + shard_name = Shard.name_for_number(tracker_name, n) + + Phoenix.Tracker.Shard.dirty_get_by_key_with_limit(shard_name, key, upper_bound_limit_per_shard) + end) + |> Enum.take(limit) + end + @doc """ Gracefully shuts down by broadcasting permdown to all replicas. @@ -245,6 +281,17 @@ defmodule Phoenix.Tracker do Supervisor.stop(tracker_name) end + @doc false + @spec size(atom) :: non_neg_integer + def size(tracker_name) do + 0..(pool_size(tracker_name) - 1) + |> Enum.reduce(0, fn n, acc -> + shard_name = Shard.name_for_number(tracker_name, n) + + Phoenix.Tracker.Shard.size(shard_name) + acc + end) + end + @doc """ Starts a tracker pool. diff --git a/lib/phoenix/tracker/shard.ex b/lib/phoenix/tracker/shard.ex index 6fd792b6..4c77b9d4 100644 --- a/lib/phoenix/tracker/shard.ex +++ b/lib/phoenix/tracker/shard.ex @@ -86,11 +86,31 @@ defmodule Phoenix.Tracker.Shard do |> State.get_by_key(topic, key) end + @spec dirty_get_by_key(atom, topic, term) :: [presence] + def dirty_get_by_key(shard_name, topic, key) do + State.tracked_key(shard_name, topic, key, []) + end + + @spec dirty_get_by_key(atom, term) :: [presence] + def dirty_get_by_key(shard_name, key) do + State.tracked_key(shard_name, key, []) + end + + @spec dirty_get_by_key_with_limit(atom, term, integer()) :: [presence] + def dirty_get_by_key_with_limit(shard_name, key, limit) do + State.tracked_key_with_limit(shard_name, key, [], limit) + end + @spec graceful_permdown(pid) :: :ok def graceful_permdown(server_pid) do GenServer.call(server_pid, :graceful_permdown) end + @spec size(atom) :: non_neg_integer + def size(shard_name) do + State.size(shard_name) + end + ## Server def start_link(tracker, tracker_opts, pool_opts) do diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 1d5949cc..75526f86 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -57,11 +57,16 @@ defmodule Phoenix.Tracker.State do replica: replica, context: %{replica => 0}, mode: :normal, - values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]), + values: :ets.new(shard_name, [:named_table, :protected, :ordered_set, write_concurrency: :true]), pids: :ets.new(:pids, [:duplicate_bag]), replicas: %{replica => :up}}) end + @spec size(atom) :: non_neg_integer + def size(table) do + :ets.info(table, :size) + end + @doc """ Returns the causal context for the set. """ @@ -161,6 +166,37 @@ defmodule Phoenix.Tracker.State do [{{:"$1", :"$2"}}]}]) end + def tracked_key(table, key, down_replicas) do + :ets.select( + table, + [ + { + {{:"$1", :"$2", key}, :"$3", {:"$4", :_}}, + not_in(:"$4", down_replicas), + [{{:"$1", :"$2", :"$3"}}] + } + ] + ) + end + + def tracked_key_with_limit(table, key, down_replicas, limit) do + :ets.select( + table, + [ + { + {{:"$1", :"$2", key}, :"$3", {:"$4", :_}}, + not_in(:"$4", down_replicas), + [{{:"$1", :"$2", :"$3"}}] + } + ], + limit + ) + |> case do + {lst, _} -> lst + _ -> [] + end + end + defp not_in(_pos, []), do: [] defp not_in(pos, replicas), do: [not: ors(pos, replicas)] defp ors(pos, [rep]), do: {:"=:=", pos, {rep}} diff --git a/test/phoenix/tracker/pool_test.exs b/test/phoenix/tracker/pool_test.exs index e7bd1071..74715ae2 100644 --- a/test/phoenix/tracker/pool_test.exs +++ b/test/phoenix/tracker/pool_test.exs @@ -35,9 +35,51 @@ defmodule Phoenix.Tracker.PoolTest do end @tag pool_size: pool_size - test "pool #{pool_size}: Untrack/4 results in all ids being untracked", - %{server: server} do + test "pool #{pool_size}: dirty_get_by_key/2 returns presences from all shards", %{server: server} do + topics = for i <- 1..100, do: "topic_#{i}" + + refs = + for topic <- topics do + {:ok, ref} = Tracker.track(server, self(), topic, "me", %{name: "me"}) + ref + end + + for topic <- topics do + {:ok, _} = Tracker.track(server, self(), topic, "other", %{name: "me"}) + end + + by_key = Tracker.dirty_get_by_key(server, "me") + assert length(by_key) == 100 + + topics_and_refs = + for {topic, pid, %{name: "me", phx_ref: ref}} <- by_key do + assert pid == self() + {topic, ref} + end + + assert Enum.sort(topics_and_refs) == Enum.sort(List.zip([topics, refs])) + end + + @tag pool_size: pool_size + test "pool #{pool_size}: dirty_get_by_key_with_limit/2 returns at most limit num results", %{server: server} do + limit = Enum.random([1, 2, 5, 1000]) + topics = for i <- 1..2000, do: "topic_#{i}" + + for topic <- topics do + {:ok, _} = Tracker.track(server, self(), topic, "me", %{name: "me"}) + end + + for topic <- topics do + {:ok, _} = Tracker.track(server, self(), topic, "other", %{name: "me"}) + end + res = Tracker.dirty_get_by_key_with_limit(server, "me", limit) + assert length(res) <= limit + end + + @tag pool_size: pool_size + test "pool #{pool_size}: Untrack/4 results in all ids being untracked", + %{server: server} do topics = for i <- 1..100, do: "topic_#{i}" for t <- topics do {:ok, _ref} = Tracker.track(server, self(), t, "me", %{a: "b"}) @@ -143,5 +185,16 @@ defmodule Phoenix.Tracker.PoolTest do for t <- topics, do: assert Tracker.list(server, t) == [] end + + @tag pool_size: pool_size + test "pool #{pool_size}: count/1 returns number of entries across all shards", + %{server: server} do + topics = for i <- 1..100, do: "topic_#{i}" + for t <- topics do + {:ok, _ref} = Tracker.track(server, self(), t, "me", %{a: "b"}) + end + + assert Tracker.size(server) == 100 + end end end diff --git a/test/phoenix/tracker/shard_replication_test.exs b/test/phoenix/tracker/shard_replication_test.exs index 0a6fed5e..ae505356 100644 --- a/test/phoenix/tracker/shard_replication_test.exs +++ b/test/phoenix/tracker/shard_replication_test.exs @@ -244,6 +244,8 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert %{@node1 => %Replica{status: :up}} = replicas(shard) assert [{"local1", _}, {"node1", _}] = list(shard, topic) assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic) + assert [_] = Shard.dirty_get_by_key(shard, topic, "local1") + assert [_] = Shard.dirty_get_by_key(shard, topic, "node1") # nodedown Process.unlink(node_pid) @@ -252,9 +254,13 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert %{@node1 => %Replica{status: :down}} = replicas(shard) assert [{"local1", _}] = list(shard, topic) assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic) + assert [_] = Shard.dirty_get_by_key(shard, topic, "local1") + assert [_] = Shard.dirty_get_by_key(shard, topic, "node1") :timer.sleep(@permdown + 2*@heartbeat) assert [{"local1", _}] = dirty_list(shard, topic) + assert [_] = Shard.dirty_get_by_key(shard, topic, "local1") + assert [] = Shard.dirty_get_by_key(shard, topic, "node1") end diff --git a/test/phoenix/tracker/shard_test.exs b/test/phoenix/tracker/shard_test.exs index f6338af2..169126a3 100644 --- a/test/phoenix/tracker/shard_test.exs +++ b/test/phoenix/tracker/shard_test.exs @@ -13,4 +13,45 @@ defmodule Phoenix.Tracker.ShardTest do assert Phoenix.Tracker.Shard.init([nil, nil, opts]) == {:error, "permdown_period must be at least larger than the down_period"} end + + defmodule TestTracker do + use Phoenix.Tracker + def init(state), do: {:ok, state} + def handle_diff(_diff, state), do: {:ok, state} + end + + describe "size/1" do + test "returns 0 when there are no entries in the shard" do + tracker = TestTracker + name = :"#{inspect(make_ref())}" + shard_name = Phoenix.Tracker.Shard.name_for_number(name, 1) + given_pubsub(name) + opts = [pubsub_server: name, name: name, shard_number: 1] + {:ok, _pid} = Phoenix.Tracker.Shard.start_link(tracker, %{}, opts) + + assert Phoenix.Tracker.Shard.size(shard_name) == 0 + end + + test "returns number of tracked entries in the shard" do + tracker = TestTracker + name = :"#{inspect(make_ref())}" + shard_name = Phoenix.Tracker.Shard.name_for_number(name, 1) + given_pubsub(name) + opts = [pubsub_server: name, name: name, shard_number: 1] + {:ok, pid} = Phoenix.Tracker.Shard.start_link(tracker, %{}, opts) + + for i <- 1..100 do + Phoenix.Tracker.Shard.track(pid, self(), "topic", "user#{i}", %{}) + end + + assert Phoenix.Tracker.Shard.size(shard_name) == 100 + end + + defp given_pubsub(name) do + size = 1 + {adapter, adapter_opts} = Application.get_env(:phoenix_pubsub, :test_adapter) + adapter_opts = [adapter: adapter, name: name, pool_size: size] ++ adapter_opts + start_supervised!({Phoenix.PubSub, adapter_opts}) + end + end end diff --git a/test/phoenix/tracker/state_test.exs b/test/phoenix/tracker/state_test.exs index 081bbff4..9f7f4eb2 100644 --- a/test/phoenix/tracker/state_test.exs +++ b/test/phoenix/tracker/state_test.exs @@ -409,4 +409,36 @@ defmodule Phoenix.Tracker.StateTest do end) end) end + + describe "size/1" do + test "returns 0 for empty state" do + shard_name = :shard + State.new(:s1, shard_name) + assert State.size(shard_name) == 0 + end + + test "returns size of the values tables" do + shard_name = :shard + state = State.new(:s1, shard_name) + + Enum.reduce(1..100, state, fn i, acc -> + State.join(acc, self(), "lobby", "user#{i}", %{}) + end) + + assert State.size(shard_name) == 100 + end + + test "leaves are accounted for in the returned size" do + shard_name = :shard + state = State.new(:s1, shard_name) + + state = Enum.reduce(1..100, state, fn i, acc -> + State.join(acc, self(), "topic#{i}", "user#{i}", %{}) + end) + + State.leave(state, self(), "topic1", "user1") + + assert State.size(shard_name) == 99 + end + end end