Skip to content

Commit

Permalink
Modify libcluster postgres strategy to use existing Ecto connection &…
Browse files Browse the repository at this point in the history
… config
  • Loading branch information
acco committed Dec 10, 2024
1 parent 7bbf230 commit 0d92f4f
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 81 deletions.
8 changes: 1 addition & 7 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,8 @@ import Config
config :libcluster,
topologies: [
sequin: [
strategy: LibclusterPostgres.Strategy,
strategy: Sequin.Libcluster.PostgresStrategy,
config: [
hostname: "localhost",
username: "postgres",
password: "postgres",
database: "sequin_dev",
port: 5432,
parameters: [],
channel_name: "sequin_cluster_dev"
]
]
Expand Down
44 changes: 10 additions & 34 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,6 @@ if config_env() == :prod and self_hosted do
_ -> false
end

# Take a URL and return hostname, port, etc keyword list
repo_params = Ecto.Repo.Supervisor.parse_url(database_url)

config :libcluster,
topologies: [
sequin: [
strategy: LibclusterPostgres.Strategy,
config:
Keyword.merge(repo_params,
ssl: repo_ssl,
socket_options: ecto_socket_opts,
parameters: [],
channel_name: "sequin_cluster"
)
]
]

config :sequin, Sequin.Posthog,
req_opts: [base_url: "https://us.i.posthog.com"],
api_key: "phc_i9k28nZwjjJG9DzUK0gDGASxXtGNusdI1zdaz9cuA7h",
Expand Down Expand Up @@ -170,23 +153,6 @@ if config_env() == :prod and not self_hosted do
You can generate one by calling: mix phx.gen.secret
"""

# Take a URL and return hostname, port, etc keyword list
repo_params = Ecto.Repo.Supervisor.parse_url(database_url)

config :libcluster,
topologies: [
sequin: [
strategy: LibclusterPostgres.Strategy,
config:
Keyword.merge(repo_params,
ssl: repo_ssl,
socket_options: ecto_socket_opts,
parameters: [],
channel_name: "sequin_cluster"
)
]
]

config :sentry,
dsn: System.fetch_env!("SENTRY_DSN"),
release: System.fetch_env!("CURRENT_GIT_SHA")
Expand Down Expand Up @@ -231,6 +197,16 @@ if config_env() == :prod do

redix_socket_opts = if System.get_env("REDIS_IPV6") in ~w(true 1), do: [:inet6], else: []

config :libcluster,
topologies: [
sequin: [
strategy: Sequin.Libcluster.PostgresStrategy,
config: [
channel_name: "sequin_cluster"
]
]
]

config :redix, start_opts: {System.fetch_env!("REDIS_URL"), [name: :redix] ++ [socket_opts: redix_socket_opts]}

config :sequin, Sequin.Mailer, adapter: Sequin.Swoosh.Adapters.Loops, api_key: System.get_env("LOOPS_API_KEY")
Expand Down
9 changes: 2 additions & 7 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,9 @@ config :argon2_elixir, t_cost: 1, m_cost: 8
config :libcluster,
topologies: [
sequin: [
strategy: LibclusterPostgres.Strategy,
strategy: Sequin.Libcluster.PostgresStrategy,
config: [
hostname: "localhost",
username: "postgres",
password: "postgres",
database: "sequin_test",
port: 5432,
parameters: []
channel_name: "sequin_cluster_test"
]
]
]
Expand Down
59 changes: 37 additions & 22 deletions lib/sequin/libcluster/postgres_strategy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ defmodule Sequin.Libcluster.PostgresStrategy do

alias Cluster.Logger
alias Cluster.Strategy
alias Sequin.Repo

def start_link(args), do: GenServer.start_link(__MODULE__, args)

Expand All @@ -27,27 +28,12 @@ defmodule Sequin.Libcluster.PostgresStrategy do
def init([state]) do
channel_name = Keyword.get(state.config, :channel_name, clean_cookie(Node.get_cookie()))

opts = [
hostname: Keyword.fetch!(state.config, :hostname),
username: Keyword.fetch!(state.config, :username),
password: Keyword.fetch!(state.config, :password),
database: Keyword.fetch!(state.config, :database),
port: Keyword.fetch!(state.config, :port),
ssl: Keyword.get(state.config, :ssl),
ssl_opts: Keyword.get(state.config, :ssl_opts),
parameters: Keyword.fetch!(state.config, :parameters),
channel_name: channel_name
]

config =
state.config
|> Keyword.put_new(:channel_name, channel_name)
|> Keyword.put_new(:heartbeat_interval, 5_000)
|> Keyword.delete(:url)

meta = %{
opts: fn -> opts end,
conn: nil,
conn_notif: nil,
heartbeat_ref: make_ref()
}
Expand All @@ -56,29 +42,27 @@ defmodule Sequin.Libcluster.PostgresStrategy do
end

def handle_continue(:connect, state) do
with {:ok, conn} <- Postgrex.start_link(state.meta.opts.()),
{:ok, conn_notif} <- Postgrex.Notifications.start_link(state.meta.opts.()),
with {:ok, conn_notif} <- Postgrex.Notifications.start_link(config()),
{_, _} <- Postgrex.Notifications.listen(conn_notif, state.config[:channel_name]) do
Logger.info(state.topology, "Connected to Postgres database")
Logger.info(state.topology, "Listening for notifications on Postgres database")

meta = %{
state.meta
| conn: conn,
conn_notif: conn_notif,
| conn_notif: conn_notif,
heartbeat_ref: heartbeat(0)
}

{:noreply, put_in(state.meta, meta)}
else
reason ->
Logger.error(state.topology, "Failed to connect to Postgres: #{inspect(reason)}")
Logger.error(state.topology, "Failed to start listening for Postgres notifications: #{inspect(reason)}")
{:noreply, state}
end
end

def handle_info(:heartbeat, state) do
Process.cancel_timer(state.meta.heartbeat_ref)
Postgrex.query(state.meta.conn, "NOTIFY #{state.config[:channel_name]}, '#{node()}'", [])
Repo.query("notify #{state.config[:channel_name]}, '#{node()}'", [])
ref = heartbeat(state.config[:heartbeat_interval])
{:noreply, put_in(state.meta.heartbeat_ref, ref)}
end
Expand Down Expand Up @@ -116,4 +100,35 @@ defmodule Sequin.Libcluster.PostgresStrategy do
defp clean_cookie(str) when is_binary(str) do
String.replace(str, ~r/\W/, "_")
end

defp config do
Keyword.take(Repo.config(), [
:hostname,
:endpoints,
:socket_dir,
:socket,
:port,
:database,
:username,
:password,
:parameters,
:timeout,
:connect_timeout,
:handshake_timeout,
:ping_timeout,
:ssl,
:socket_options,
:prepare,
:transactions,
:types,
:search_path,
:disconnect_on_error_codes,
:pool_size,
:idle_interval,
:queue_target,
:queue_interval,
:ownership_timeout,
:show_sensitive_data_on_connection_error
])
end
end
15 changes: 15 additions & 0 deletions lib/sequin_web/controllers/info_controller.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule SequinWeb.InfoController do
use SequinWeb, :controller

def version(conn, _params) do
version = Application.get_env(:sequin, :release_version)
json(conn, %{version: version})
end

def info(conn, _params) do
json(conn, %{
version: Application.get_env(:sequin, :release_version),
nodes: Enum.map(Node.list(), &Atom.to_string/1)
})
end
end
8 changes: 0 additions & 8 deletions lib/sequin_web/controllers/version_controller.ex

This file was deleted.

6 changes: 5 additions & 1 deletion lib/sequin_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ defmodule SequinWeb.Router do
get "/auth/github/callback", UserSessionController, :callback
delete "/logout", UserSessionController, :delete

get "/version", VersionController, :show
get "/version", InfoController, :version

if @self_hosted do
get "/info", InfoController, :info
end

live_session :home do
live "/", HomeLive, :index
Expand Down
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ defmodule Sequin.MixProject do
{:ymlr, "~> 5.0"},
{:brod, "~> 4.3"},
{:jose, "~> 1.11"},
{:libcluster_postgres, github: "acco/libcluster_postgres"},
{:syn, "~> 3.3"}
{:syn, "~> 3.3"},
{:libcluster, "~> 3.3"}
]
end

Expand Down

0 comments on commit 0d92f4f

Please sign in to comment.