diff --git a/lib/realtime/broadcast_changes/handler.ex b/lib/realtime/broadcast_changes/handler.ex index 501008347..3d5b0f383 100644 --- a/lib/realtime/broadcast_changes/handler.ex +++ b/lib/realtime/broadcast_changes/handler.ex @@ -216,7 +216,6 @@ defmodule Realtime.BroadcastChanges.Handler do @impl true def handle_disconnect(state) do Logger.error("Disconnected from the server: #{inspect(state, pretty: true)}") - {:noreply, %{state | step: :disconnected}} end @@ -240,8 +239,8 @@ defmodule Realtime.BroadcastChanges.Handler do {:noreply, [], state} end - def handle_data(_, state) do - Logger.warning("Unknown data received") + def handle_data(e, state) do + log_error("UnexpectedMessageReceived", e) {:noreply, [], state} end diff --git a/lib/realtime/database.ex b/lib/realtime/database.ex index 9f12f5926..32b04a365 100644 --- a/lib/realtime/database.ex +++ b/lib/realtime/database.ex @@ -139,8 +139,7 @@ defmodule Realtime.Database do {:error, e} -> Process.exit(conn, :kill) Helpers.log_error("UnableToConnectToTenantDatabase", e) - - {:error, :tenant_database_unavailable} + {:error, e} end end end) diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index a9b48f200..ad2b1fc25 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -85,8 +85,8 @@ defmodule Realtime.Tenants do {:ok, %{healthy: true, db_connected: true, connected_cluster: connected_cluster}} connected_cluster when is_integer(connected_cluster) -> - {:ok, db_conn} = Connect.lookup_or_start_connection(external_id) - %{extensions: [%{settings: settings} | _]} = Cache.get_tenant_by_external_id(external_id) + tenant = Cache.get_tenant_by_external_id(external_id) + {:ok, db_conn} = Database.connect(tenant, "realtime_health_check", 1) Database.transaction(db_conn, fn transaction_conn -> query = @@ -95,6 +95,8 @@ defmodule Realtime.Tenants do res = Postgrex.query!(transaction_conn, query, []) if res.rows == [] do + %{extensions: [%{settings: settings} | _]} = tenant + Migrations.run_migrations(%Migrations{ tenant_external_id: external_id, settings: settings diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex index 27c6535cd..3338d3d0c 100644 --- a/lib/realtime/tenants/connect.ex +++ b/lib/realtime/tenants/connect.ex @@ -13,17 +13,30 @@ defmodule Realtime.Tenants.Connect do import Realtime.Helpers, only: [log_error: 2] alias Realtime.Api.Tenant - alias Realtime.Database alias Realtime.Rpc alias Realtime.Tenants alias Realtime.Tenants.Migrations alias Realtime.UsersCounter - alias Realtime.BroadcastChanges.Handler - + alias Realtime.Tenants.Connect.Piper + alias Realtime.Tenants.Connect.CheckConnection + alias Realtime.Tenants.Connect.StartReplication + alias Realtime.Tenants.Connect.Migrations + alias Realtime.Tenants.Connect.GetTenant + alias Realtime.Tenants.Connect.RegisterProcess + alias Realtime.Tenants.Connect.StartCounters + + @pipes [ + GetTenant, + CheckConnection, + Migrations, + StartCounters, + StartReplication, + RegisterProcess + ] @rpc_timeout_default 30_000 @check_connected_user_interval_default 50_000 @connected_users_bucket_shutdown [0, 0, 0, 0, 0, 0] - @application_name "realtime_connect" + defstruct tenant_id: nil, db_conn_reference: nil, db_conn_pid: nil, @@ -130,23 +143,10 @@ defmodule Realtime.Tenants.Connect do def init(%{tenant_id: tenant_id} = state) do Logger.metadata(external_id: tenant_id, project: tenant_id) - with %Tenant{} = tenant <- Tenants.get_tenant_by_external_id(tenant_id), - {:ok, conn} <- Database.check_tenant_connection(tenant, @application_name), - ref = Process.monitor(conn), - [%{settings: settings} | _] <- tenant.extensions, - migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings}, - :ok <- Migrations.run_migrations(migrations) do - :syn.update_registry(__MODULE__, tenant_id, fn _pid, meta -> %{meta | conn: conn} end) - - state = %{state | db_conn_reference: ref, db_conn_pid: conn} - - if tenant.notify_private_alpha do - {:ok, state, {:continue, :setup_broadcast_changes}} - else - {:ok, state, {:continue, :setup_connected_user_events}} - end + with {:ok, acc} <- Piper.run(@pipes, state) do + {:ok, acc, {:continue, :setup_connected_user_events}} else - nil -> + {:error, :tenant_not_found} -> log_error("TenantNotFound", "Tenant not found") {:stop, :shutdown} @@ -156,34 +156,7 @@ defmodule Realtime.Tenants.Connect do end end - @impl GenServer - def handle_continue(:setup_broadcast_changes, %{tenant_id: tenant_id} = state) do - tenant = Tenants.Cache.get_tenant_by_external_id(tenant_id) - opts = %Handler{tenant_id: tenant.external_id} - supervisor_spec = Handler.supervisor_spec(tenant) - - child_spec = %{ - id: Handler, - start: {Handler, :start_link, [opts]}, - restart: :transient, - type: :worker - } - - case DynamicSupervisor.start_child(supervisor_spec, child_spec) do - {:ok, pid} -> - {:noreply, %{state | broadcast_changes_pid: pid}, - {:continue, :setup_connected_user_events}} - - {:error, {:already_started, pid}} -> - {:noreply, %{state | broadcast_changes_pid: pid}, - {:continue, :setup_connected_user_events}} - - error -> - log_error("UnableToStartHandler", error) - {:stop, :shutdown, state} - end - end - + @impl true def handle_continue(:setup_connected_user_events, state) do %{ check_connected_user_interval: check_connected_user_interval, diff --git a/lib/realtime/tenants/connect/check_connection.ex b/lib/realtime/tenants/connect/check_connection.ex new file mode 100644 index 000000000..1c07472fb --- /dev/null +++ b/lib/realtime/tenants/connect/check_connection.ex @@ -0,0 +1,24 @@ +defmodule Realtime.Tenants.Connect.CheckConnection do + @moduledoc """ + Check tenant database connection. + """ + alias Realtime.Database + alias Realtime.Tenants.Cache + + @application_name "realtime_connect" + @behaviour Realtime.Tenants.Connect.Piper + + @impl true + def run(acc) do + %{tenant_id: tenant_id} = acc + tenant = Cache.get_tenant_by_external_id(tenant_id) + + case Database.check_tenant_connection(tenant, @application_name) do + {:ok, conn} -> + {:ok, %{acc | db_conn_pid: conn, db_conn_reference: Process.monitor(conn)}} + + {:error, error} -> + {:error, error} + end + end +end diff --git a/lib/realtime/tenants/connect/get_tenant.ex b/lib/realtime/tenants/connect/get_tenant.ex new file mode 100644 index 000000000..01cd2ab35 --- /dev/null +++ b/lib/realtime/tenants/connect/get_tenant.ex @@ -0,0 +1,19 @@ +defmodule Realtime.Tenants.Connect.GetTenant do + @moduledoc """ + Get tenant database connection. + """ + + alias Realtime.Api.Tenant + alias Realtime.Tenants + @behaviour Realtime.Tenants.Connect.Piper + + @impl Realtime.Tenants.Connect.Piper + def run(acc) do + %{tenant_id: tenant_id} = acc + + case Tenants.Cache.get_tenant_by_external_id(tenant_id) do + %Tenant{} -> {:ok, acc} + _ -> {:error, :tenant_not_found} + end + end +end diff --git a/lib/realtime/tenants/connect/migrations.ex b/lib/realtime/tenants/connect/migrations.ex new file mode 100644 index 000000000..dd7e1ff6d --- /dev/null +++ b/lib/realtime/tenants/connect/migrations.ex @@ -0,0 +1,20 @@ +defmodule Realtime.Tenants.Connect.Migrations do + @moduledoc """ + Migrations for the Tenants.Connect process. + """ + @behaviour Realtime.Tenants.Connect.Piper + alias Realtime.Tenants.Migrations + alias Realtime.Tenants.Cache + @impl true + def run(acc) do + %{tenant_id: tenant_id} = acc + tenant = Cache.get_tenant_by_external_id(tenant_id) + [%{settings: settings} | _] = tenant.extensions + migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings} + + case Migrations.run_migrations(migrations) do + :ok -> {:ok, acc} + {:error, error} -> {:error, error} + end + end +end diff --git a/lib/realtime/tenants/connect/piper.ex b/lib/realtime/tenants/connect/piper.ex new file mode 100644 index 000000000..b38c62fa2 --- /dev/null +++ b/lib/realtime/tenants/connect/piper.ex @@ -0,0 +1,21 @@ +defmodule Realtime.Tenants.Connect.Piper do + @moduledoc """ + Pipes different commands to execute specific actions during the connection process. + """ + @callback run(any()) :: {:ok, any()} | {:error, any()} + + def run(pipers, init) do + Enum.reduce_while(pipers, {:ok, init}, fn piper, {:ok, acc} -> + case piper.run(acc) do + {:ok, result} -> + {:cont, {:ok, result}} + + {:error, error} -> + {:halt, {:error, error}} + + _e -> + raise ArgumentError, "must return {:ok, _} or {:error, _}" + end + end) + end +end diff --git a/lib/realtime/tenants/connect/register_process.ex b/lib/realtime/tenants/connect/register_process.ex new file mode 100644 index 000000000..246a656dc --- /dev/null +++ b/lib/realtime/tenants/connect/register_process.ex @@ -0,0 +1,19 @@ +defmodule Realtime.Tenants.Connect.RegisterProcess do + @moduledoc """ + Registers the database process in :syn + """ + + @behaviour Realtime.Tenants.Connect.Piper + + @impl true + def run(acc) do + %{tenant_id: tenant_id, db_conn_pid: conn} = acc + + case :syn.update_registry(Realtime.Tenants.Connect, tenant_id, fn _pid, meta -> + %{meta | conn: conn} + end) do + {:ok, _} -> {:ok, acc} + {:error, error} -> {:error, error} + end + end +end diff --git a/lib/realtime/tenants/connect/start_counters.ex b/lib/realtime/tenants/connect/start_counters.ex new file mode 100644 index 000000000..e057bbca4 --- /dev/null +++ b/lib/realtime/tenants/connect/start_counters.ex @@ -0,0 +1,71 @@ +defmodule Realtime.Tenants.Connect.StartCounters do + @moduledoc """ + Start tenant counters. + """ + + alias Realtime.GenCounter + alias Realtime.RateCounter + alias Realtime.Tenants + alias Realtime.Tenants.Cache + + @behaviour Realtime.Tenants.Connect.Piper + + @impl true + def run(acc) do + %{tenant_id: tenant_id} = acc + tenant = Cache.get_tenant_by_external_id(tenant_id) + + with {:ok, _} <- start_joins_per_second_counter(tenant), + {:ok, _} <- start_max_events_counter(tenant), + {:ok, _} <- start_db_events_counter(tenant) do + end + + {:ok, acc} + end + + def start_joins_per_second_counter(tenant) do + %{max_joins_per_second: max_joins_per_second} = tenant + id = Tenants.joins_per_second_key(tenant) + GenCounter.new(id) + + RateCounter.new(id, + idle_shutdown: :infinity, + telemetry: %{ + event_name: [:channel, :joins], + measurements: %{limit: max_joins_per_second}, + metadata: %{tenant: tenant} + } + ) + end + + def start_max_events_counter(tenant) do + %{max_events_per_second: max_events_per_second} = tenant + + key = Tenants.events_per_second_key(tenant) + + GenCounter.new(key) + + RateCounter.new(key, + idle_shutdown: :infinity, + telemetry: %{ + event_name: [:channel, :events], + measurements: %{limit: max_events_per_second}, + metadata: %{tenant: tenant} + } + ) + end + + def start_db_events_counter(tenant) do + key = Tenants.db_events_per_second_key(tenant) + GenCounter.new(key) + + RateCounter.new(key, + idle_shutdown: :infinity, + telemetry: %{ + event_name: [:channel, :db_events], + measurements: %{}, + metadata: %{tenant: tenant} + } + ) + end +end diff --git a/lib/realtime/tenants/connect/start_replication.ex b/lib/realtime/tenants/connect/start_replication.ex new file mode 100644 index 000000000..a5e0172fb --- /dev/null +++ b/lib/realtime/tenants/connect/start_replication.ex @@ -0,0 +1,34 @@ +defmodule Realtime.Tenants.Connect.StartReplication do + @moduledoc """ + Starts BroadcastChanges replication slot. + """ + + @behaviour Realtime.Tenants.Connect.Piper + alias Realtime.BroadcastChanges.Handler + alias Realtime.Tenants.Cache + @impl true + def run(acc) do + %{tenant_id: tenant_id} = acc + tenant = Cache.get_tenant_by_external_id(tenant_id) + + if tenant.notify_private_alpha do + opts = %Handler{tenant_id: tenant_id} + supervisor_spec = Handler.supervisor_spec(tenant) + + child_spec = %{ + id: Handler, + start: {Handler, :start_link, [opts]}, + restart: :transient, + type: :worker + } + + case DynamicSupervisor.start_child(supervisor_spec, child_spec) do + {:ok, pid} -> {:ok, Map.put(acc, :broadcast_changes_pid, pid)} + {:error, {:already_started, pid}} -> {:ok, Map.put(acc, :broadcast_changes_pid, pid)} + error -> {:error, error} + end + else + {:ok, acc} + end + end +end diff --git a/mix.exs b/mix.exs index 1276aba4b..adca5f821 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.33.8", + version: "2.33.9", elixir: "~> 1.16.0", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/realtime/broadcast_changes/handler_test.exs b/test/realtime/broadcast_changes/handler_test.exs index adf94c80f..d201e8622 100644 --- a/test/realtime/broadcast_changes/handler_test.exs +++ b/test/realtime/broadcast_changes/handler_test.exs @@ -15,6 +15,7 @@ defmodule Realtime.BroadcastChanges.HandlerTest do slot = Application.get_env(:realtime, :slot_name_suffix) Application.put_env(:realtime, :slot_name_suffix, "test") start_supervised(Realtime.Tenants.CacheSupervisor) + tenant = tenant_fixture() [%{settings: settings} | _] = tenant.extensions migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings} @@ -22,8 +23,15 @@ defmodule Realtime.BroadcastChanges.HandlerTest do {:ok, conn} = Database.connect(tenant, "realtime_test", 1) clean_table(conn, "realtime", "messages") - Postgrex.query(conn, "DROP PUBLICATION IF EXISTS realtime_messages_publication", []) - Realtime.Database.replication_slot_teardown(tenant) + + publication = + Handler.publication_name(%Handler{ + tenant_id: tenant.external_id, + schema: "realtime", + table: "messages" + }) + + Postgrex.query(conn, "DROP PUBLICATION #{publication}", []) on_exit(fn -> Application.put_env(:realtime, :slot_name_suffix, slot) end) @@ -68,15 +76,12 @@ defmodule Realtime.BroadcastChanges.HandlerTest do broadcast: fn _, _, _, _ -> :ok end do tenant = tenant_fixture() - assert {:ok, _pid} = - start_supervised(%{ - id: Handler, - start: {Handler, :start_link, [%Handler{tenant_id: tenant.external_id}]}, - restart: :transient, - type: :worker - }) - - :timer.sleep(1000) + start_supervised!(%{ + id: Handler, + start: {Handler, :start_link, [%Handler{tenant_id: tenant.external_id}]}, + restart: :transient, + type: :worker + }) total_messages = 5 # Works with one insert per transaction @@ -89,7 +94,7 @@ defmodule Realtime.BroadcastChanges.HandlerTest do }) end - :timer.sleep(1000) + :timer.sleep(500) assert_called_exactly(BatchBroadcast.broadcast(nil, tenant, :_, :_), total_messages) # Works with batch inserts @@ -105,7 +110,7 @@ defmodule Realtime.BroadcastChanges.HandlerTest do Database.connect(tenant, "realtime_test", 1) Realtime.Repo.insert_all_entries(Message, messages, Message) - :timer.sleep(1000) + :timer.sleep(500) assert_called_exactly(BatchBroadcast.broadcast(nil, tenant, :_, :_), total_messages) end diff --git a/test/realtime/tenants/connect/piper_test.exs b/test/realtime/tenants/connect/piper_test.exs new file mode 100644 index 000000000..3362dafc0 --- /dev/null +++ b/test/realtime/tenants/connect/piper_test.exs @@ -0,0 +1,68 @@ +defmodule Realtime.Tenants.Connect.PiperTest do + use ExUnit.Case, async: true + alias Realtime.Tenants.Connect.Piper + + defmodule Piper1 do + @behaviour Piper + def run(acc), do: {:ok, Map.put(acc, :piper1, "Piper1")} + end + + defmodule Piper2 do + @behaviour Piper + def run(acc), do: Map.get(acc, :piper1) && {:ok, Map.put(acc, :piper2, "Piper2")} + end + + defmodule Piper3 do + @behaviour Piper + def run(acc), do: Map.get(acc, :piper2) && {:ok, Map.put(acc, :piper3, "Piper3")} + end + + defmodule PiperErr do + @behaviour Piper + def run(_acc), do: {:error, "PiperErr"} + end + + defmodule PiperBadReturn do + @behaviour Piper + def run(_acc), do: nil + end + + defmodule PiperException do + @behaviour Piper + def run(_acc), do: raise("PiperException") + end + + @pipeline [ + Realtime.Tenants.Connect.PiperTest.Piper1, + Realtime.Tenants.Connect.PiperTest.Piper2, + Realtime.Tenants.Connect.PiperTest.Piper3 + ] + test "runs pipeline as expected and accumlates outputs" do + assert {:ok, + %{ + piper1: "Piper1", + piper2: "Piper2", + piper3: "Piper3", + initial: "state" + }} = Piper.run(@pipeline, %{initial: "state"}) + end + + test "runs pipeline and handles error" do + assert {:error, "PiperErr"} = + Piper.run(@pipeline ++ [Realtime.Tenants.Connect.PiperTest.PiperErr], %{ + initial: "state" + }) + end + + test "runs pipeline and handles bad return with raise" do + assert_raise ArgumentError, fn -> + Piper.run(@pipeline ++ [Realtime.Tenants.Connect.PiperTest.PiperBadReturn], %{}) + end + end + + test "on pipeline job function, raises exception" do + assert_raise RuntimeError, fn -> + Piper.run(@pipeline ++ [Realtime.Tenants.Connect.PiperTest.PiperException], %{}) + end + end +end