Skip to content

Commit

Permalink
Shut down the connection supervisor in case of an unrecoverable repli…
Browse files Browse the repository at this point in the history
…cation error

The StackSupervisor will shut itself down as a consequence.

In a multi-tenant setup, this effectively shuts down a single tenant
while allowing the other tenants to keep chugging along.
  • Loading branch information
alco committed Dec 19, 2024
1 parent 88c77cd commit 95afd17
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 34 deletions.
31 changes: 18 additions & 13 deletions integration-tests/tests/invalidated-replication-slot.lux
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

[my invalidated_slot_error=
"""
[error] :gen_statem {:"Elixir.Electric.ProcessRegistry:single_stack", {Electric.Postgres.ReplicationClient, nil}} terminating
** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot_integration"

This slot has been invalidated because it exceeded the maximum reserved size.
"""]

[my stack_id="single_stack"]

###

## Start a new Postgres cluster configured for easy replication slot invalidation.
Expand All @@ -25,9 +26,6 @@
[shell electric]
??[info] Starting replication from postgres

# Reset the failure pattern because we'll be matching on an error.
-

## Seed the database with enough data to exceed max_wal_size and force a checkpoint that
## will invalidate the replication slot.
[invoke seed_pg]
Expand All @@ -36,21 +34,28 @@
[shell pg]
?invalidating slot "electric_slot_integration" because its restart_lsn [\d\w]+/[\d\w]+ exceeds max_slot_wal_keep_size

[macro verify_connection_and_stack_supervisors_shutdown stack_id invalidated_slot_error]
??$invalidated_slot_error
??[error] Stopping connection supervisor with stack_id=$stack_id due to an unrecoverable error

!IO.puts("Stack supervisor pid: #{inspect Process.whereis(Electric.StackSupervisor)}")
??Stack supervisor pid: nil
[endmacro]

## Observe the fatal connection error.
[shell electric]
??$invalidated_slot_error
# Reset the failure pattern because we'll be matching on an error.
-

# Confirm Electric process exit.
??$PS1
[invoke verify_connection_and_stack_supervisors_shutdown $stack_id $invalidated_slot_error]

## Start the sync service once again to verify that it crashes due to the invalidated slot error.
[invoke setup_electric]
# Restart the OTP application to verify that the supervisors shut down again due to the invalidated slot.
!:ok = Application.stop(:electric)
!:ok = Application.start(:electric)

[shell electric]
??[info] Starting replication from postgres
-
??$invalidated_slot_error
??$PS1

[invoke verify_connection_and_stack_supervisors_shutdown $stack_id $invalidated_slot_error]

[cleanup]
[invoke teardown]
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ defmodule Electric.Application do
],
pool_opts: [pool_size: Electric.Config.get_env(:db_pool_size)],
storage: storage,
chunk_bytes_threshold: Electric.Config.get_env(:chunk_bytes_threshold)
chunk_bytes_threshold: Electric.Config.get_env(:chunk_bytes_threshold),
name: Electric.StackSupervisor
},
{Electric.Telemetry, stack_id: stack_id, storage: storage},
{Bandit,
Expand Down
44 changes: 25 additions & 19 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -365,13 +365,13 @@ defmodule Electric.Connection.Manager do
# connection and the DB pool. If any of the latter two shut down, Connection.Manager will
# itself terminate to be restarted by its supervisor in a clean state.
def handle_info({:EXIT, pid, reason}, %State{replication_client_pid: pid} = state) do
halt_if_fatal_error!(reason)
with false <- stop_if_fatal_error(reason, state) do
Logger.debug(
"Handling the exit of the replication client #{inspect(pid)} with reason #{inspect(reason)}"
)

Logger.debug(
"Handling the exit of the replication client #{inspect(pid)} with reason #{inspect(reason)}"
)

{:noreply, %{state | replication_client_pid: nil}, {:continue, :start_replication_client}}
{:noreply, %{state | replication_client_pid: nil}, {:continue, :start_replication_client}}
end
end

# The most likely reason for the lock connection or the DB pool to exit is the database
Expand Down Expand Up @@ -524,8 +524,13 @@ defmodule Electric.Connection.Manager do
end

defp handle_connection_error(error, state, mode) do
halt_if_fatal_error!(error)
with false <- stop_if_fatal_error(error, state) do
state = schedule_reconnection_after_error(error, state, mode)
{:noreply, state}
end
end

defp schedule_reconnection_after_error(error, state, mode) do
message =
case error do
%DBConnection.ConnectionError{message: message} ->
Expand Down Expand Up @@ -553,8 +558,7 @@ defmodule Electric.Connection.Manager do
is_nil(state.pool_pid) -> :start_connection_pool
end

state = schedule_reconnection(step, state)
{:noreply, state}
schedule_reconnection(step, state)
end

defp pg_error_extra_info(pg_error) do
Expand All @@ -573,23 +577,25 @@ defmodule Electric.Connection.Manager do
end
end

@invalid_slot_detail "This slot has been invalidated because it exceeded the maximum reserved size."

defp halt_if_fatal_error!(
defp stop_if_fatal_error(
%Postgrex.Error{
postgres: %{
code: :object_not_in_prerequisite_state,
detail: @invalid_slot_detail,
pg_code: "55000",
routine: "StartLogicalReplication"
detail: "This slot has been invalidated" <> _,
pg_code: "55000"
}
} = error
} = error,
state
) do
System.stop(1)
exit(error)
# Perform supervisor shutdown in a task to avoid a circular dependency where the manager
# process is waiting for the supervisor to shut down its children, one of which is the
# manager process itself.
Task.start(Electric.Connection.Supervisor, :shutdown, [state.stack_id, error])

{:noreply, state}
end

defp halt_if_fatal_error!(_), do: nil
defp stop_if_fatal_error(_, _), do: false

defp schedule_reconnection(step, %State{backoff: {backoff, _}} = state) do
{time, backoff} = :backoff.fail(backoff)
Expand Down
13 changes: 12 additions & 1 deletion packages/sync-service/lib/electric/connection/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ defmodule Electric.Connection.Supervisor do
has successfully initialized a database connection pool.
"""

use Supervisor
use Supervisor, restart: :transient, significant: true

require Logger

def name(opts) do
Electric.ProcessRegistry.name(opts[:stack_id], __MODULE__)
Expand All @@ -28,6 +30,15 @@ defmodule Electric.Connection.Supervisor do
Supervisor.start_link(__MODULE__, opts, name: name(opts))
end

def shutdown(stack_id, reason) do
Logger.error(
"Stopping connection supervisor with stack_id=#{inspect(stack_id)} " <>
"due to an unrecoverable error: #{inspect(reason)}"
)

Supervisor.stop(name(stack_id: stack_id), {:shutdown, reason}, 1_000)
end

def init(opts) do
Process.set_label({:connection_supervisor, opts[:stack_id]})
Logger.metadata(stack_id: opts[:stack_id])
Expand Down
5 changes: 5 additions & 0 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ defmodule Electric.StackSupervisor do
Registry.register(registry, {:stack_status, stack_id}, value)
end

# noop if there's no registry running
def dispatch_stack_event(nil, _stack_id, _event) do
:ok
end

def dispatch_stack_event(registry, stack_id, event) do
Registry.dispatch(registry, {:stack_status, stack_id}, fn entries ->
for {pid, ref} <- entries do
Expand Down

0 comments on commit 95afd17

Please sign in to comment.