From 0d92f4fa2940b3c71fda6bcb2c2596e298efc682 Mon Sep 17 00:00:00 2001 From: Anthony Accomazzo Date: Tue, 10 Dec 2024 13:46:11 -0800 Subject: [PATCH] Modify libcluster postgres strategy to use existing Ecto connection & config --- config/dev.exs | 8 +-- config/runtime.exs | 44 ++++---------- config/test.exs | 9 +-- lib/sequin/libcluster/postgres_strategy.ex | 59 ++++++++++++------- lib/sequin_web/controllers/info_controller.ex | 15 +++++ .../controllers/version_controller.ex | 8 --- lib/sequin_web/router.ex | 6 +- mix.exs | 4 +- 8 files changed, 72 insertions(+), 81 deletions(-) create mode 100644 lib/sequin_web/controllers/info_controller.ex delete mode 100644 lib/sequin_web/controllers/version_controller.ex diff --git a/config/dev.exs b/config/dev.exs index 706f38757..67519504d 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -3,14 +3,8 @@ import Config config :libcluster, topologies: [ sequin: [ - strategy: LibclusterPostgres.Strategy, + strategy: Sequin.Libcluster.PostgresStrategy, config: [ - hostname: "localhost", - username: "postgres", - password: "postgres", - database: "sequin_dev", - port: 5432, - parameters: [], channel_name: "sequin_cluster_dev" ] ] diff --git a/config/runtime.exs b/config/runtime.exs index afeb19c3b..1939aa8d2 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -107,23 +107,6 @@ if config_env() == :prod and self_hosted do _ -> false end - # Take a URL and return hostname, port, etc keyword list - repo_params = Ecto.Repo.Supervisor.parse_url(database_url) - - config :libcluster, - topologies: [ - sequin: [ - strategy: LibclusterPostgres.Strategy, - config: - Keyword.merge(repo_params, - ssl: repo_ssl, - socket_options: ecto_socket_opts, - parameters: [], - channel_name: "sequin_cluster" - ) - ] - ] - config :sequin, Sequin.Posthog, req_opts: [base_url: "https://us.i.posthog.com"], api_key: "phc_i9k28nZwjjJG9DzUK0gDGASxXtGNusdI1zdaz9cuA7h", @@ -170,23 +153,6 @@ if config_env() == :prod and not self_hosted do You can generate one by calling: mix phx.gen.secret """ - # Take a URL and return hostname, port, etc keyword list - repo_params = Ecto.Repo.Supervisor.parse_url(database_url) - - config :libcluster, - topologies: [ - sequin: [ - strategy: LibclusterPostgres.Strategy, - config: - Keyword.merge(repo_params, - ssl: repo_ssl, - socket_options: ecto_socket_opts, - parameters: [], - channel_name: "sequin_cluster" - ) - ] - ] - config :sentry, dsn: System.fetch_env!("SENTRY_DSN"), release: System.fetch_env!("CURRENT_GIT_SHA") @@ -231,6 +197,16 @@ if config_env() == :prod do redix_socket_opts = if System.get_env("REDIS_IPV6") in ~w(true 1), do: [:inet6], else: [] + config :libcluster, + topologies: [ + sequin: [ + strategy: Sequin.Libcluster.PostgresStrategy, + config: [ + channel_name: "sequin_cluster" + ] + ] + ] + config :redix, start_opts: {System.fetch_env!("REDIS_URL"), [name: :redix] ++ [socket_opts: redix_socket_opts]} config :sequin, Sequin.Mailer, adapter: Sequin.Swoosh.Adapters.Loops, api_key: System.get_env("LOOPS_API_KEY") diff --git a/config/test.exs b/config/test.exs index 46e58dc3d..568c7a7b5 100644 --- a/config/test.exs +++ b/config/test.exs @@ -8,14 +8,9 @@ config :argon2_elixir, t_cost: 1, m_cost: 8 config :libcluster, topologies: [ sequin: [ - strategy: LibclusterPostgres.Strategy, + strategy: Sequin.Libcluster.PostgresStrategy, config: [ - hostname: "localhost", - username: "postgres", - password: "postgres", - database: "sequin_test", - port: 5432, - parameters: [] + channel_name: "sequin_cluster_test" ] ] ] diff --git a/lib/sequin/libcluster/postgres_strategy.ex b/lib/sequin/libcluster/postgres_strategy.ex index 8ea717abf..6fc04f12e 100644 --- a/lib/sequin/libcluster/postgres_strategy.ex +++ b/lib/sequin/libcluster/postgres_strategy.ex @@ -19,6 +19,7 @@ defmodule Sequin.Libcluster.PostgresStrategy do alias Cluster.Logger alias Cluster.Strategy + alias Sequin.Repo def start_link(args), do: GenServer.start_link(__MODULE__, args) @@ -27,27 +28,12 @@ defmodule Sequin.Libcluster.PostgresStrategy do def init([state]) do channel_name = Keyword.get(state.config, :channel_name, clean_cookie(Node.get_cookie())) - opts = [ - hostname: Keyword.fetch!(state.config, :hostname), - username: Keyword.fetch!(state.config, :username), - password: Keyword.fetch!(state.config, :password), - database: Keyword.fetch!(state.config, :database), - port: Keyword.fetch!(state.config, :port), - ssl: Keyword.get(state.config, :ssl), - ssl_opts: Keyword.get(state.config, :ssl_opts), - parameters: Keyword.fetch!(state.config, :parameters), - channel_name: channel_name - ] - config = state.config |> Keyword.put_new(:channel_name, channel_name) |> Keyword.put_new(:heartbeat_interval, 5_000) - |> Keyword.delete(:url) meta = %{ - opts: fn -> opts end, - conn: nil, conn_notif: nil, heartbeat_ref: make_ref() } @@ -56,29 +42,27 @@ defmodule Sequin.Libcluster.PostgresStrategy do end def handle_continue(:connect, state) do - with {:ok, conn} <- Postgrex.start_link(state.meta.opts.()), - {:ok, conn_notif} <- Postgrex.Notifications.start_link(state.meta.opts.()), + with {:ok, conn_notif} <- Postgrex.Notifications.start_link(config()), {_, _} <- Postgrex.Notifications.listen(conn_notif, state.config[:channel_name]) do - Logger.info(state.topology, "Connected to Postgres database") + Logger.info(state.topology, "Listening for notifications on Postgres database") meta = %{ state.meta - | conn: conn, - conn_notif: conn_notif, + | conn_notif: conn_notif, heartbeat_ref: heartbeat(0) } {:noreply, put_in(state.meta, meta)} else reason -> - Logger.error(state.topology, "Failed to connect to Postgres: #{inspect(reason)}") + Logger.error(state.topology, "Failed to start listening for Postgres notifications: #{inspect(reason)}") {:noreply, state} end end def handle_info(:heartbeat, state) do Process.cancel_timer(state.meta.heartbeat_ref) - Postgrex.query(state.meta.conn, "NOTIFY #{state.config[:channel_name]}, '#{node()}'", []) + Repo.query("notify #{state.config[:channel_name]}, '#{node()}'", []) ref = heartbeat(state.config[:heartbeat_interval]) {:noreply, put_in(state.meta.heartbeat_ref, ref)} end @@ -116,4 +100,35 @@ defmodule Sequin.Libcluster.PostgresStrategy do defp clean_cookie(str) when is_binary(str) do String.replace(str, ~r/\W/, "_") end + + defp config do + Keyword.take(Repo.config(), [ + :hostname, + :endpoints, + :socket_dir, + :socket, + :port, + :database, + :username, + :password, + :parameters, + :timeout, + :connect_timeout, + :handshake_timeout, + :ping_timeout, + :ssl, + :socket_options, + :prepare, + :transactions, + :types, + :search_path, + :disconnect_on_error_codes, + :pool_size, + :idle_interval, + :queue_target, + :queue_interval, + :ownership_timeout, + :show_sensitive_data_on_connection_error + ]) + end end diff --git a/lib/sequin_web/controllers/info_controller.ex b/lib/sequin_web/controllers/info_controller.ex new file mode 100644 index 000000000..45a630ab9 --- /dev/null +++ b/lib/sequin_web/controllers/info_controller.ex @@ -0,0 +1,15 @@ +defmodule SequinWeb.InfoController do + use SequinWeb, :controller + + def version(conn, _params) do + version = Application.get_env(:sequin, :release_version) + json(conn, %{version: version}) + end + + def info(conn, _params) do + json(conn, %{ + version: Application.get_env(:sequin, :release_version), + nodes: Enum.map(Node.list(), &Atom.to_string/1) + }) + end +end diff --git a/lib/sequin_web/controllers/version_controller.ex b/lib/sequin_web/controllers/version_controller.ex deleted file mode 100644 index 26490f746..000000000 --- a/lib/sequin_web/controllers/version_controller.ex +++ /dev/null @@ -1,8 +0,0 @@ -defmodule SequinWeb.VersionController do - use SequinWeb, :controller - - def show(conn, _params) do - version = Application.get_env(:sequin, :release_version) - json(conn, %{version: version}) - end -end diff --git a/lib/sequin_web/router.ex b/lib/sequin_web/router.ex index 8c52a70b6..238a461a8 100644 --- a/lib/sequin_web/router.ex +++ b/lib/sequin_web/router.ex @@ -31,7 +31,11 @@ defmodule SequinWeb.Router do get "/auth/github/callback", UserSessionController, :callback delete "/logout", UserSessionController, :delete - get "/version", VersionController, :show + get "/version", InfoController, :version + + if @self_hosted do + get "/info", InfoController, :info + end live_session :home do live "/", HomeLive, :index diff --git a/mix.exs b/mix.exs index 2888356e7..bc254b09d 100644 --- a/mix.exs +++ b/mix.exs @@ -84,8 +84,8 @@ defmodule Sequin.MixProject do {:ymlr, "~> 5.0"}, {:brod, "~> 4.3"}, {:jose, "~> 1.11"}, - {:libcluster_postgres, github: "acco/libcluster_postgres"}, - {:syn, "~> 3.3"} + {:syn, "~> 3.3"}, + {:libcluster, "~> 3.3"} ] end