Skip to content

Commit

Permalink
fix: Adds plug like logic to Connect (#1174)
Browse files Browse the repository at this point in the history
Add Plug like logic to Connect so we can pipe operations of the Connect operation to ensure we guarantee full operation of all features

Co-authored-by: Wojtek Mach <[email protected]>
  • Loading branch information
filipecabaco and wojtekmach authored Oct 23, 2024
1 parent c1b39dd commit 75d2aea
Show file tree
Hide file tree
Showing 14 changed files with 323 additions and 69 deletions.
5 changes: 2 additions & 3 deletions lib/realtime/broadcast_changes/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ defmodule Realtime.BroadcastChanges.Handler do
@impl true
def handle_disconnect(state) do
Logger.error("Disconnected from the server: #{inspect(state, pretty: true)}")

{:noreply, %{state | step: :disconnected}}
end

Expand All @@ -240,8 +239,8 @@ defmodule Realtime.BroadcastChanges.Handler do
{:noreply, [], state}
end

def handle_data(_, state) do
Logger.warning("Unknown data received")
def handle_data(e, state) do
log_error("UnexpectedMessageReceived", e)
{:noreply, [], state}
end

Expand Down
3 changes: 1 addition & 2 deletions lib/realtime/database.ex
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ defmodule Realtime.Database do
{:error, e} ->
Process.exit(conn, :kill)
Helpers.log_error("UnableToConnectToTenantDatabase", e)

{:error, :tenant_database_unavailable}
{:error, e}
end
end
end)
Expand Down
6 changes: 4 additions & 2 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ defmodule Realtime.Tenants do
{:ok, %{healthy: true, db_connected: true, connected_cluster: connected_cluster}}

connected_cluster when is_integer(connected_cluster) ->
{:ok, db_conn} = Connect.lookup_or_start_connection(external_id)
%{extensions: [%{settings: settings} | _]} = Cache.get_tenant_by_external_id(external_id)
tenant = Cache.get_tenant_by_external_id(external_id)
{:ok, db_conn} = Database.connect(tenant, "realtime_health_check", 1)

Database.transaction(db_conn, fn transaction_conn ->
query =
Expand All @@ -95,6 +95,8 @@ defmodule Realtime.Tenants do
res = Postgrex.query!(transaction_conn, query, [])

if res.rows == [] do
%{extensions: [%{settings: settings} | _]} = tenant

Migrations.run_migrations(%Migrations{
tenant_external_id: external_id,
settings: settings
Expand Down
69 changes: 21 additions & 48 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,30 @@ defmodule Realtime.Tenants.Connect do
import Realtime.Helpers, only: [log_error: 2]

alias Realtime.Api.Tenant
alias Realtime.Database
alias Realtime.Rpc
alias Realtime.Tenants
alias Realtime.Tenants.Migrations
alias Realtime.UsersCounter
alias Realtime.BroadcastChanges.Handler

alias Realtime.Tenants.Connect.Piper
alias Realtime.Tenants.Connect.CheckConnection
alias Realtime.Tenants.Connect.StartReplication
alias Realtime.Tenants.Connect.Migrations
alias Realtime.Tenants.Connect.GetTenant
alias Realtime.Tenants.Connect.RegisterProcess
alias Realtime.Tenants.Connect.StartCounters

@pipes [
GetTenant,
CheckConnection,
Migrations,
StartCounters,
StartReplication,
RegisterProcess
]
@rpc_timeout_default 30_000
@check_connected_user_interval_default 50_000
@connected_users_bucket_shutdown [0, 0, 0, 0, 0, 0]
@application_name "realtime_connect"

defstruct tenant_id: nil,
db_conn_reference: nil,
db_conn_pid: nil,
Expand Down Expand Up @@ -130,23 +143,10 @@ defmodule Realtime.Tenants.Connect do
def init(%{tenant_id: tenant_id} = state) do
Logger.metadata(external_id: tenant_id, project: tenant_id)

with %Tenant{} = tenant <- Tenants.get_tenant_by_external_id(tenant_id),
{:ok, conn} <- Database.check_tenant_connection(tenant, @application_name),
ref = Process.monitor(conn),
[%{settings: settings} | _] <- tenant.extensions,
migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings},
:ok <- Migrations.run_migrations(migrations) do
:syn.update_registry(__MODULE__, tenant_id, fn _pid, meta -> %{meta | conn: conn} end)

state = %{state | db_conn_reference: ref, db_conn_pid: conn}

if tenant.notify_private_alpha do
{:ok, state, {:continue, :setup_broadcast_changes}}
else
{:ok, state, {:continue, :setup_connected_user_events}}
end
with {:ok, acc} <- Piper.run(@pipes, state) do
{:ok, acc, {:continue, :setup_connected_user_events}}
else
nil ->
{:error, :tenant_not_found} ->
log_error("TenantNotFound", "Tenant not found")
{:stop, :shutdown}

Expand All @@ -156,34 +156,7 @@ defmodule Realtime.Tenants.Connect do
end
end

@impl GenServer
def handle_continue(:setup_broadcast_changes, %{tenant_id: tenant_id} = state) do
tenant = Tenants.Cache.get_tenant_by_external_id(tenant_id)
opts = %Handler{tenant_id: tenant.external_id}
supervisor_spec = Handler.supervisor_spec(tenant)

child_spec = %{
id: Handler,
start: {Handler, :start_link, [opts]},
restart: :transient,
type: :worker
}

case DynamicSupervisor.start_child(supervisor_spec, child_spec) do
{:ok, pid} ->
{:noreply, %{state | broadcast_changes_pid: pid},
{:continue, :setup_connected_user_events}}

{:error, {:already_started, pid}} ->
{:noreply, %{state | broadcast_changes_pid: pid},
{:continue, :setup_connected_user_events}}

error ->
log_error("UnableToStartHandler", error)
{:stop, :shutdown, state}
end
end

@impl true
def handle_continue(:setup_connected_user_events, state) do
%{
check_connected_user_interval: check_connected_user_interval,
Expand Down
24 changes: 24 additions & 0 deletions lib/realtime/tenants/connect/check_connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Realtime.Tenants.Connect.CheckConnection do
@moduledoc """
Check tenant database connection.
"""
alias Realtime.Database
alias Realtime.Tenants.Cache

@application_name "realtime_connect"
@behaviour Realtime.Tenants.Connect.Piper

@impl true
def run(acc) do
%{tenant_id: tenant_id} = acc
tenant = Cache.get_tenant_by_external_id(tenant_id)

case Database.check_tenant_connection(tenant, @application_name) do
{:ok, conn} ->
{:ok, %{acc | db_conn_pid: conn, db_conn_reference: Process.monitor(conn)}}

{:error, error} ->
{:error, error}
end
end
end
19 changes: 19 additions & 0 deletions lib/realtime/tenants/connect/get_tenant.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Realtime.Tenants.Connect.GetTenant do
@moduledoc """
Get tenant database connection.
"""

alias Realtime.Api.Tenant
alias Realtime.Tenants
@behaviour Realtime.Tenants.Connect.Piper

@impl Realtime.Tenants.Connect.Piper
def run(acc) do
%{tenant_id: tenant_id} = acc

case Tenants.Cache.get_tenant_by_external_id(tenant_id) do
%Tenant{} -> {:ok, acc}
_ -> {:error, :tenant_not_found}
end
end
end
20 changes: 20 additions & 0 deletions lib/realtime/tenants/connect/migrations.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Realtime.Tenants.Connect.Migrations do
@moduledoc """
Migrations for the Tenants.Connect process.
"""
@behaviour Realtime.Tenants.Connect.Piper
alias Realtime.Tenants.Migrations
alias Realtime.Tenants.Cache
@impl true
def run(acc) do
%{tenant_id: tenant_id} = acc
tenant = Cache.get_tenant_by_external_id(tenant_id)
[%{settings: settings} | _] = tenant.extensions
migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings}

case Migrations.run_migrations(migrations) do
:ok -> {:ok, acc}
{:error, error} -> {:error, error}
end
end
end
21 changes: 21 additions & 0 deletions lib/realtime/tenants/connect/piper.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule Realtime.Tenants.Connect.Piper do
@moduledoc """
Pipes different commands to execute specific actions during the connection process.
"""
@callback run(any()) :: {:ok, any()} | {:error, any()}

def run(pipers, init) do
Enum.reduce_while(pipers, {:ok, init}, fn piper, {:ok, acc} ->
case piper.run(acc) do
{:ok, result} ->
{:cont, {:ok, result}}

{:error, error} ->
{:halt, {:error, error}}

_e ->
raise ArgumentError, "must return {:ok, _} or {:error, _}"
end
end)
end
end
19 changes: 19 additions & 0 deletions lib/realtime/tenants/connect/register_process.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Realtime.Tenants.Connect.RegisterProcess do
@moduledoc """
Registers the database process in :syn
"""

@behaviour Realtime.Tenants.Connect.Piper

@impl true
def run(acc) do
%{tenant_id: tenant_id, db_conn_pid: conn} = acc

case :syn.update_registry(Realtime.Tenants.Connect, tenant_id, fn _pid, meta ->
%{meta | conn: conn}
end) do
{:ok, _} -> {:ok, acc}
{:error, error} -> {:error, error}
end
end
end
71 changes: 71 additions & 0 deletions lib/realtime/tenants/connect/start_counters.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
defmodule Realtime.Tenants.Connect.StartCounters do
@moduledoc """
Start tenant counters.
"""

alias Realtime.GenCounter
alias Realtime.RateCounter
alias Realtime.Tenants
alias Realtime.Tenants.Cache

@behaviour Realtime.Tenants.Connect.Piper

@impl true
def run(acc) do
%{tenant_id: tenant_id} = acc
tenant = Cache.get_tenant_by_external_id(tenant_id)

with {:ok, _} <- start_joins_per_second_counter(tenant),
{:ok, _} <- start_max_events_counter(tenant),
{:ok, _} <- start_db_events_counter(tenant) do
end

{:ok, acc}
end

def start_joins_per_second_counter(tenant) do
%{max_joins_per_second: max_joins_per_second} = tenant
id = Tenants.joins_per_second_key(tenant)
GenCounter.new(id)

RateCounter.new(id,
idle_shutdown: :infinity,
telemetry: %{
event_name: [:channel, :joins],
measurements: %{limit: max_joins_per_second},
metadata: %{tenant: tenant}
}
)
end

def start_max_events_counter(tenant) do
%{max_events_per_second: max_events_per_second} = tenant

key = Tenants.events_per_second_key(tenant)

GenCounter.new(key)

RateCounter.new(key,
idle_shutdown: :infinity,
telemetry: %{
event_name: [:channel, :events],
measurements: %{limit: max_events_per_second},
metadata: %{tenant: tenant}
}
)
end

def start_db_events_counter(tenant) do
key = Tenants.db_events_per_second_key(tenant)
GenCounter.new(key)

RateCounter.new(key,
idle_shutdown: :infinity,
telemetry: %{
event_name: [:channel, :db_events],
measurements: %{},
metadata: %{tenant: tenant}
}
)
end
end
34 changes: 34 additions & 0 deletions lib/realtime/tenants/connect/start_replication.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Realtime.Tenants.Connect.StartReplication do
@moduledoc """
Starts BroadcastChanges replication slot.
"""

@behaviour Realtime.Tenants.Connect.Piper
alias Realtime.BroadcastChanges.Handler
alias Realtime.Tenants.Cache
@impl true
def run(acc) do
%{tenant_id: tenant_id} = acc
tenant = Cache.get_tenant_by_external_id(tenant_id)

if tenant.notify_private_alpha do
opts = %Handler{tenant_id: tenant_id}
supervisor_spec = Handler.supervisor_spec(tenant)

child_spec = %{
id: Handler,
start: {Handler, :start_link, [opts]},
restart: :transient,
type: :worker
}

case DynamicSupervisor.start_child(supervisor_spec, child_spec) do
{:ok, pid} -> {:ok, Map.put(acc, :broadcast_changes_pid, pid)}
{:error, {:already_started, pid}} -> {:ok, Map.put(acc, :broadcast_changes_pid, pid)}
error -> {:error, error}
end
else
{:ok, acc}
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.33.8",
version: "2.33.9",
elixir: "~> 1.16.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Loading

0 comments on commit 75d2aea

Please sign in to comment.