diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index b1b6b79f2b..0da658f066 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -105,11 +105,15 @@ database_ipv6_config = env!("ELECTRIC_DATABASE_USE_IPV6", :boolean, false) {:ok, database_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_url) - connection_opts = database_url_config ++ [ipv6: database_ipv6_config] - config :electric, connection_opts: Electric.Utils.obfuscate_password(connection_opts) +if database_pool_url = env!("DATABASE_POOL_URL", :string, nil) do + {:ok, database_pool_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_pool_url) + connection_opts = database_pool_url_config ++ [ipv6: database_ipv6_config] + config :electric, pool_connection_opts: Electric.Utils.obfuscate_password(connection_opts) +end + enable_integration_testing = env!("ELECTRIC_ENABLE_INTEGRATION_TESTING", :boolean, false) cache_max_age = env!("ELECTRIC_CACHE_MAX_AGE", :integer, 60) cache_stale_age = env!("ELECTRIC_CACHE_STALE_AGE", :integer, 60 * 5) diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 44bf27b09a..9a83ec340f 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -36,6 +36,18 @@ defmodule Electric.Application do publication_name = "electric_publication_#{replication_stream_id}" slot_name = "electric_slot_#{replication_stream_id}" + replication_connection_opts = Application.fetch_env!(:electric, :connection_opts) + pool_connection_opts = Application.get_env(:electric, :pool_connection_opts) + + connection_opts = pool_connection_opts || replication_connection_opts + + replication_opts = [ + connection_opts: replication_connection_opts, + publication_name: publication_name, + slot_name: slot_name, + slot_temporary?: Application.fetch_env!(:electric, :replication_slot_temporary?) + ] + # The root application supervisor starts the core global processes, including the HTTP # server and the database connection manager. The latter is responsible for establishing # all needed connections to the database (acquiring the exclusive access lock, opening a @@ -53,13 +65,9 @@ defmodule Electric.Application do {Electric.StackSupervisor, stack_id: stack_id, stack_events_registry: Registry.StackEvents, - connection_opts: Application.fetch_env!(:electric, :connection_opts), persistent_kv: persistent_kv, - replication_opts: [ - publication_name: publication_name, - slot_name: slot_name, - slot_temporary?: Application.fetch_env!(:electric, :replication_slot_temporary?) - ], + connection_opts: connection_opts, + replication_opts: replication_opts, pool_opts: [pool_size: Application.fetch_env!(:electric, :db_pool_size)], storage: Application.fetch_env!(:electric, :storage), chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold)}, diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 343121e957..8f8c88601e 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -267,16 +267,20 @@ defmodule Electric.Connection.Manager do end def handle_continue(:start_replication_client, %State{replication_client_pid: nil} = state) do - opts = - state - |> Map.take([:stack_id, :replication_opts, :connection_opts]) - |> Map.to_list() - Logger.debug("Starting replication client for stack #{state.stack_id}") + {connection_opts, replication_opts} = Keyword.pop(state.replication_opts, :connection_opts) + + opts = [ + connection_opts: connection_opts, + replication_opts: replication_opts, + stack_id: state.stack_id + ] + case start_replication_client(opts) do {:ok, pid, connection_opts} -> - state = %{state | replication_client_pid: pid, connection_opts: connection_opts} + replication_opts = Keyword.put(replication_opts, :connection_opts, connection_opts) + state = %{state | replication_client_pid: pid, replication_opts: replication_opts} if is_nil(state.pool_pid) do # This is the case where Connection.Manager starts connections from the initial state. diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index b8c669e29e..4ad82933c3 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -29,28 +29,31 @@ defmodule Electric.StackSupervisor do """ use Supervisor, restart: :transient + @connection_opts_schema [ + type: :keyword_list, + required: true, + keys: [ + hostname: [type: :string, required: true], + port: [type: :integer, required: true], + database: [type: :string, required: true], + username: [type: :string, required: true], + password: [type: {:fun, 0}, required: true], + sslmode: [type: :atom, required: false], + ipv6: [type: :boolean, required: false] + ] + ] + @opts_schema NimbleOptions.new!( name: [type: :any, required: false], stack_id: [type: :string, required: true], persistent_kv: [type: :any, required: true], stack_events_registry: [type: :atom, required: true], - connection_opts: [ - type: :keyword_list, - required: true, - keys: [ - hostname: [type: :string, required: true], - port: [type: :integer, required: true], - database: [type: :string, required: true], - username: [type: :string, required: true], - password: [type: {:fun, 0}, required: true], - sslmode: [type: :atom, required: false], - ipv6: [type: :boolean, required: false] - ] - ], + connection_opts: @connection_opts_schema, replication_opts: [ type: :keyword_list, required: true, keys: [ + connection_opts: @connection_opts_schema, publication_name: [type: :string, required: true], slot_name: [type: :string, required: true], slot_temporary?: [type: :boolean, default: false], diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 7dc47a28c4..b3ada65e35 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -163,6 +163,7 @@ defmodule Support.ComponentSetup do storage: storage, connection_opts: ctx.db_config, replication_opts: [ + connection_opts: ctx.db_config, slot_name: "electric_test_slot_#{:erlang.phash2(stack_id)}", publication_name: "electric_test_pub_#{:erlang.phash2(stack_id)}", try_creating_publication?: true,