diff --git a/.changeset/fresh-beers-allow.md b/.changeset/fresh-beers-allow.md new file mode 100644 index 0000000000..3add3140ce --- /dev/null +++ b/.changeset/fresh-beers-allow.md @@ -0,0 +1,5 @@ +--- +"@core/elixir-client": patch +--- + +Add pool behaviour for the Elixir client to allow for per-client persistent connections. Add request timestamp and shape handle to replication stream messages. diff --git a/packages/elixir-client/lib/electric/client.ex b/packages/elixir-client/lib/electric/client.ex index f306e00ac3..e99863ab72 100644 --- a/packages/elixir-client/lib/electric/client.ex +++ b/packages/elixir-client/lib/electric/client.ex @@ -153,7 +153,8 @@ defmodule Electric.Client do :endpoint, :database_id, :fetch, - :authenticator + :authenticator, + :pool ] @api_endpoint_path "/v1/shape" @@ -178,6 +179,11 @@ defmodule Electric.Client do type: :mod_arg, default: {Client.Authenticator.Unauthenticated, []}, doc: false + ], + pool: [ + type: :mod_arg, + default: {Electric.Client.Fetch.Pool, []}, + doc: false ] ) @@ -396,7 +402,7 @@ defmodule Electric.Client do """ def delete_shape(%Client{} = client, %ShapeDefinition{} = shape) do request = request(client, method: :delete, shape: shape) - Electric.Client.Fetch.Request.request(client, request) + Electric.Client.Fetch.request(client, request, []) end defp validate_queryable!(queryable) when is_atom(queryable) do diff --git a/packages/elixir-client/lib/electric/client/fetch.ex b/packages/elixir-client/lib/electric/client/fetch.ex index 79b2dec384..bd9acf79e7 100644 --- a/packages/elixir-client/lib/electric/client/fetch.ex +++ b/packages/elixir-client/lib/electric/client/fetch.ex @@ -1,7 +1,18 @@ defmodule Electric.Client.Fetch do alias Electric.Client.Fetch.{Request, Response} + alias Electric.Client @callback fetch(Request.t(), Keyword.t()) :: {:ok, Response.t()} | {:error, Response.t() | term()} + + @behaviour Electric.Client.Fetch.Pool + + def request(client, request, opts \\ []) + + @impl Electric.Client.Fetch.Pool + def request(%Client{} = client, %Request{} = request, _opts) do + %{pool: {module, opts}} = client + apply(module, :request, [client, request, opts]) + end end diff --git a/packages/elixir-client/lib/electric/client/fetch/http.ex b/packages/elixir-client/lib/electric/client/fetch/http.ex index 7ad11de1a9..786a95fbda 100644 --- a/packages/elixir-client/lib/electric/client/fetch/http.ex +++ b/packages/elixir-client/lib/electric/client/fetch/http.ex @@ -31,15 +31,16 @@ defmodule Electric.Client.Fetch.HTTP do end defp request(request) do - request |> Req.request() |> wrap_resp() + now = DateTime.utc_now() + request |> Req.request() |> wrap_resp(now) end - defp wrap_resp({:ok, %Req.Response{} = resp}) do + defp wrap_resp({:ok, %Req.Response{} = resp}, timestamp) do %{status: status, headers: headers, body: body} = resp - {:ok, Fetch.Response.decode!(status, headers, body)} + {:ok, Fetch.Response.decode!(status, headers, body, timestamp)} end - defp wrap_resp({:error, _} = error) do + defp wrap_resp({:error, _} = error, _timestamp) do error end diff --git a/packages/elixir-client/lib/electric/client/fetch/pool.ex b/packages/elixir-client/lib/electric/client/fetch/pool.ex new file mode 100644 index 0000000000..4375796211 --- /dev/null +++ b/packages/elixir-client/lib/electric/client/fetch/pool.ex @@ -0,0 +1,70 @@ +defmodule Electric.Client.Fetch.Pool do + @moduledoc """ + Coaleses requests so that multiple client instances making the same + (potentially long-polling) request will all use the same request process. + """ + + alias Electric.Client + alias Electric.Client.Fetch + + require Logger + + @callback request(Client.t(), Fetch.Request.t(), opts :: Keyword.t()) :: + Fetch.Response.t() | {:error, Fetch.Response.t() | term()} + + @behaviour __MODULE__ + + @impl Electric.Client.Fetch.Pool + def request(%Client{} = client, %Fetch.Request{} = request, opts) do + request_id = request_id(client, request) + + # register this pid before making the request to avoid race conditions for + # very fast responses + {:ok, monitor_pid} = start_monitor(request_id) + + try do + ref = Fetch.Monitor.register(monitor_pid, self()) + + {:ok, _request_pid} = start_request(request_id, request, client, monitor_pid) + + Fetch.Monitor.wait(ref) + catch + :exit, {reason, _} -> + Logger.debug(fn -> + "Request process ended with reason #{inspect(reason)} before we could register. Re-attempting." + end) + + request(client, request, opts) + end + end + + defp start_request(request_id, request, client, monitor_pid) do + DynamicSupervisor.start_child( + Electric.Client.RequestSupervisor, + {Fetch.Request, {request_id, request, client, monitor_pid}} + ) + |> return_existing() + end + + defp start_monitor(request_id) do + DynamicSupervisor.start_child( + Electric.Client.RequestSupervisor, + {Electric.Client.Fetch.Monitor, request_id} + ) + |> return_existing() + end + + defp return_existing({:ok, pid}), do: {:ok, pid} + defp return_existing({:error, {:already_started, pid}}), do: {:ok, pid} + defp return_existing(error), do: error + + defp request_id(%Client{fetch: {fetch_impl, _}}, %Fetch.Request{shape_handle: nil} = request) do + %{endpoint: endpoint, shape: shape_definition} = request + {fetch_impl, URI.to_string(endpoint), shape_definition} + end + + defp request_id(%Client{fetch: {fetch_impl, _}}, %Fetch.Request{} = request) do + %{endpoint: endpoint, offset: offset, live: live, shape_handle: shape_handle} = request + {fetch_impl, URI.to_string(endpoint), shape_handle, Client.Offset.to_tuple(offset), live} + end +end diff --git a/packages/elixir-client/lib/electric/client/fetch/request.ex b/packages/elixir-client/lib/electric/client/fetch/request.ex index 273849c8b5..829607c964 100644 --- a/packages/elixir-client/lib/electric/client/fetch/request.ex +++ b/packages/elixir-client/lib/electric/client/fetch/request.ex @@ -10,6 +10,7 @@ defmodule Electric.Client.Fetch.Request do require Logger defstruct [ + :stream_id, :endpoint, :database_id, :shape_handle, @@ -28,6 +29,7 @@ defmodule Electric.Client.Fetch.Request do @type headers :: %{String.t() => [String.t()] | String.t()} fields = [ + stream_id: quote(do: term()), method: quote(do: :get | :head | :delete), endpoint: quote(do: URI.t()), offset: quote(do: Electric.Client.Offset.t()), @@ -62,21 +64,21 @@ defmodule Electric.Client.Fetch.Request do {:via, Registry, {Electric.Client.Registry, {__MODULE__, request_id}}} end - defp request_id(%Client{fetch: {fetch_impl, _}}, %__MODULE__{shape_handle: nil} = request) do - %{endpoint: endpoint, shape: shape_definition} = request - {fetch_impl, URI.to_string(endpoint), shape_definition} - end - - defp request_id(%Client{fetch: {fetch_impl, _}}, %__MODULE__{} = request) do - %{endpoint: endpoint, offset: offset, live: live, shape_handle: shape_handle} = request - {fetch_impl, URI.to_string(endpoint), shape_handle, Offset.to_tuple(offset), live} - end - @doc """ Returns the URL for the Request. """ @spec url(t()) :: binary() def url(%__MODULE__{} = request, opts \\ []) do + request + |> uri(opts) + |> URI.to_string() + end + + @doc """ + Returns the %URI{} for the Request. + """ + @spec uri(t()) :: URI.t() + def uri(%__MODULE__{} = request, opts \\ []) do %{endpoint: endpoint} = request if Keyword.get(opts, :query, true) do @@ -89,9 +91,9 @@ defmodule Electric.Client.Fetch.Request do |> List.keysort(0) |> URI.encode_query(:rfc3986) - URI.to_string(%{endpoint | query: query}) + %{endpoint | query: query} else - URI.to_string(endpoint) + endpoint end end @@ -119,50 +121,6 @@ defmodule Electric.Client.Fetch.Request do |> Util.map_put_if("database_id", database_id, !is_nil(database_id)) end - @doc false - def request(%Client{} = client, %__MODULE__{} = request) do - request_id = request_id(client, request) - - # register this pid before making the request to avoid race conditions for - # very fast responses - {:ok, monitor_pid} = start_monitor(request_id) - - try do - ref = Fetch.Monitor.register(monitor_pid, self()) - - {:ok, _request_pid} = start_request(request_id, request, client, monitor_pid) - - Fetch.Monitor.wait(ref) - catch - :exit, {reason, _} -> - Logger.debug(fn -> - "Request process ended with reason #{inspect(reason)} before we could register. Re-attempting." - end) - - request(client, request) - end - end - - defp start_request(request_id, request, client, monitor_pid) do - DynamicSupervisor.start_child( - Electric.Client.RequestSupervisor, - {__MODULE__, {request_id, request, client, monitor_pid}} - ) - |> return_existing() - end - - defp start_monitor(request_id) do - DynamicSupervisor.start_child( - Electric.Client.RequestSupervisor, - {Electric.Client.Fetch.Monitor, request_id} - ) - |> return_existing() - end - - defp return_existing({:ok, pid}), do: {:ok, pid} - defp return_existing({:error, {:already_started, pid}}), do: {:ok, pid} - defp return_existing(error), do: error - @doc false def child_spec({request_id, _request, _client, _monitor_pid} = args) do %{ diff --git a/packages/elixir-client/lib/electric/client/fetch/response.ex b/packages/elixir-client/lib/electric/client/fetch/response.ex index c36710164a..7b6d4963cd 100644 --- a/packages/elixir-client/lib/electric/client/fetch/response.ex +++ b/packages/elixir-client/lib/electric/client/fetch/response.ex @@ -7,6 +7,7 @@ defmodule Electric.Client.Fetch.Response do :shape_handle, :schema, :next_cursor, + :request_timestamp, body: [], headers: %{} ] @@ -18,11 +19,24 @@ defmodule Electric.Client.Fetch.Response do last_offset: nil | Client.Offset.t(), shape_handle: nil | Client.shape_handle(), schema: nil | Client.schema(), - next_cursor: nil | Client.cursor() + next_cursor: nil | Client.cursor(), + request_timestamp: DateTime.t() } @doc false - def decode!(status, headers, body) when is_integer(status) and is_map(headers) do + @spec decode!(t()) :: t() + def decode!(%__MODULE__{headers: headers} = resp) do + resp + |> Map.put(:shape_handle, decode_shape_handle(headers)) + |> Map.put(:last_offset, decode_offset(headers)) + |> Map.put(:schema, decode_schema(headers)) + |> Map.put(:next_cursor, decode_next_cursor(headers)) + end + + @doc false + @spec decode!(pos_integer(), %{optional(binary()) => binary()}, [term()], DateTime.t()) :: t() + def decode!(status, headers, body, timestamp \\ DateTime.utc_now()) + when is_integer(status) and is_map(headers) do %__MODULE__{ status: status, headers: decode_headers(headers), @@ -30,7 +44,8 @@ defmodule Electric.Client.Fetch.Response do shape_handle: decode_shape_handle(headers), last_offset: decode_offset(headers), schema: decode_schema(headers), - next_cursor: decode_next_cursor(headers) + next_cursor: decode_next_cursor(headers), + request_timestamp: timestamp } end diff --git a/packages/elixir-client/lib/electric/client/message.ex b/packages/elixir-client/lib/electric/client/message.ex index 5e5d14b337..55f482ba06 100644 --- a/packages/elixir-client/lib/electric/client/message.ex +++ b/packages/elixir-client/lib/electric/client/message.ex @@ -5,34 +5,48 @@ defmodule Electric.Client.Message do alias Electric.Client.Offset defmodule Headers do - defstruct [:operation, :relation] + defstruct [:operation, :relation, :handle] @type operation :: :insert | :update | :delete @type relation :: [String.t(), ...] - @type t :: %__MODULE__{operation: operation(), relation: relation()} + @type t :: %__MODULE__{ + operation: operation(), + relation: relation(), + handle: Client.shape_handle() + } @doc false - def from_message(msg) do + def from_message(msg, handle) do %{"operation" => operation} = msg - %__MODULE__{relation: msg["relation"], operation: parse_operation(operation)} + + %__MODULE__{ + relation: msg["relation"], + operation: parse_operation(operation), + handle: handle + } end defp parse_operation("insert"), do: :insert defp parse_operation("update"), do: :update defp parse_operation("delete"), do: :delete - def insert(relation \\ nil), do: %__MODULE__{operation: :insert, relation: relation} - def update(relation \\ nil), do: %__MODULE__{operation: :update, relation: relation} - def delete(relation \\ nil), do: %__MODULE__{operation: :delete, relation: relation} + def insert(opts \\ []), do: struct(%__MODULE__{operation: :insert}, opts) + def update(opts \\ []), do: struct(%__MODULE__{operation: :update}, opts) + def delete(opts \\ []), do: struct(%__MODULE__{operation: :delete}, opts) end defmodule ControlMessage do - defstruct [:control, :offset] + defstruct [:control, :offset, :handle, :request_timestamp] @type control :: :must_refetch | :up_to_date - @type t :: %__MODULE__{control: control(), offset: Offset.t()} + @type t :: %__MODULE__{ + control: control(), + offset: Offset.t(), + handle: Client.shape_handle(), + request_timestamp: DateTime.t() + } - def from_message(%{"headers" => %{"control" => control}}, offset) do - %__MODULE__{control: control_atom(control), offset: offset} + def from_message(%{"headers" => %{"control" => control}}, handle, offset) do + %__MODULE__{control: control_atom(control), offset: offset, handle: handle} end defp control_atom("must-refetch"), do: :must_refetch @@ -43,7 +57,7 @@ defmodule Electric.Client.Message do end defmodule ChangeMessage do - defstruct [:key, :value, :headers, :offset] + defstruct [:key, :value, :headers, :offset, :request_timestamp] @type key :: String.t() @type value :: %{String.t() => binary()} @@ -51,12 +65,13 @@ defmodule Electric.Client.Message do key: key(), value: value(), headers: Headers.t(), - offset: Offset.t() + offset: Offset.t(), + request_timestamp: DateTime.t() } require Logger - def from_message(msg, value_mapping_fun) do + def from_message(msg, handle, value_mapping_fun) do %{ "headers" => headers, "offset" => offset, @@ -78,7 +93,7 @@ defmodule Electric.Client.Message do %__MODULE__{ key: msg["key"], offset: Client.Offset.from_string!(offset), - headers: Headers.from_message(headers), + headers: Headers.from_message(headers, handle), value: value } end @@ -117,15 +132,15 @@ defmodule Electric.Client.Message do defguard is_insert(msg) when is_struct(msg, ChangeMessage) and msg.headers.operation == :insert - def parse(%{"value" => _} = msg, _offset, value_mapper_fun) do - [ChangeMessage.from_message(msg, value_mapper_fun)] + def parse(%{"value" => _} = msg, shape_handle, _offset, value_mapper_fun) do + [ChangeMessage.from_message(msg, shape_handle, value_mapper_fun)] end - def parse(%{"headers" => %{"control" => _}} = msg, offset, _value_mapper_fun) do - [ControlMessage.from_message(msg, offset)] + def parse(%{"headers" => %{"control" => _}} = msg, shape_handle, offset, _value_mapper_fun) do + [ControlMessage.from_message(msg, shape_handle, offset)] end - def parse("", _offset, _value_mapper_fun) do + def parse("", _handle, _offset, _value_mapper_fun) do [] end end diff --git a/packages/elixir-client/lib/electric/client/stream.ex b/packages/elixir-client/lib/electric/client/stream.ex index 7b5a51d176..d5829b64b7 100644 --- a/packages/elixir-client/lib/electric/client/stream.ex +++ b/packages/elixir-client/lib/electric/client/stream.ex @@ -9,6 +9,7 @@ defmodule Electric.Client.Stream do require Electric.Client.Offset defstruct [ + :id, :client, :shape, :schema, @@ -114,7 +115,13 @@ defmodule Electric.Client.Stream do opts = NimbleOptions.validate!(Map.new(opts), @opts_schema) - struct(__MODULE__, Keyword.put(core, :opts, opts)) + id = generate_id() + + struct(__MODULE__, Keyword.put(core, :opts, opts) |> Keyword.put(:id, id)) + end + + defp generate_id do + System.unique_integer([:positive, :monotonic]) end def next(%S{buffer: buffer} = stream) do @@ -160,7 +167,8 @@ defmodule Electric.Client.Stream do resp.body |> List.wrap() - |> Enum.flat_map(&Message.parse(&1, final_offset, value_mapper_fun)) + |> Enum.flat_map(&Message.parse(&1, shape_handle, final_offset, value_mapper_fun)) + |> Enum.map(&Map.put(&1, :request_timestamp, resp.request_timestamp)) |> Enum.reduce_while({start_offset, stream}, &handle_msg/2) # don't set the offset until we're done processing the messages. ehis keeps # the previous offset reached alive in the stream state @@ -174,10 +182,11 @@ defmodule Electric.Client.Stream do when status in [409] do %{value_mapper_fun: value_mapper_fun} = stream offset = last_offset(resp, stream.offset) + handle = shape_handle(resp) stream - |> reset(shape_handle(resp)) - |> buffer(Enum.flat_map(resp.body, &Message.parse(&1, offset, value_mapper_fun))) + |> reset(handle) + |> buffer(Enum.flat_map(resp.body, &Message.parse(&1, handle, offset, value_mapper_fun))) |> dispatch() end @@ -235,6 +244,7 @@ defmodule Electric.Client.Stream do defp build_request(stream) do %{ + id: id, client: client, shape: shape, up_to_date?: up_to_date?, @@ -245,6 +255,7 @@ defmodule Electric.Client.Stream do } = stream Client.request(client, + stream_id: id, offset: offset, shape_handle: shape_handle, replica: replica, @@ -261,7 +272,7 @@ defmodule Electric.Client.Stream do end defp make_request(request, stream) do - Fetch.Request.request(stream.client, request) + Fetch.request(stream.client, request) end defp reset(stream, shape_handle) do @@ -323,7 +334,11 @@ defmodule Electric.Client.Stream do defp resume(%{opts: %{resume: %Message.ResumeMessage{} = resume}} = stream) do %{shape_handle: shape_handle, offset: offset, schema: schema} = resume - generate_value_mapper(schema, %{stream | shape_handle: shape_handle, offset: offset}) + if schema do + generate_value_mapper(schema, %{stream | shape_handle: shape_handle, offset: offset}) + else + %{stream | shape_handle: shape_handle, offset: offset} + end end defp resume(stream) do diff --git a/packages/elixir-client/test/electric/client_test.exs b/packages/elixir-client/test/electric/client_test.exs index 6b0205fd29..b0b697ca5a 100644 --- a/packages/elixir-client/test/electric/client_test.exs +++ b/packages/elixir-client/test/electric/client_test.exs @@ -9,7 +9,7 @@ defmodule Electric.ClientTest do alias Electric.Client.Fetch alias Electric.Client.Message.{ChangeMessage, ControlMessage, ResumeMessage, Headers} - @insert Headers.insert() + @insert Headers.insert(handle: "my-shape") defp client_stream(ctx, opts) do Client.stream(ctx.client, ctx.shape, opts) @@ -148,6 +148,18 @@ defmodule Electric.ClientTest do assert [%ControlMessage{control: :up_to_date, offset: offset0()}] = stream(ctx, 1) end + test "generates a unique id for the stream", ctx do + n = 100 + + ids = + for _ <- 1..n do + %{id: id} = client_stream(ctx, []) + id + end + + assert length(Enum.uniq(ids)) == n + end + test "streams a non empty shape", ctx do %{tablename: table} = ctx @@ -156,6 +168,8 @@ defmodule Electric.ClientTest do {:ok, id3} = insert_item(ctx) # snapshot values + msgs = stream(ctx, 4) + assert [ %ChangeMessage{ headers: %{operation: :insert, relation: ["public", ^table]}, @@ -173,7 +187,10 @@ defmodule Electric.ClientTest do offset: %Electric.Client.Offset{tx: 0, op: 0} }, up_to_date0() - ] = stream(ctx, 4) + ] = msgs + + # 1 timestamp for the snapshot, 1 for the up-to-date response + assert length(Enum.uniq_by(msgs, & &1.request_timestamp)) == 2 end test "accepts a table name as a shape", ctx do @@ -660,6 +677,8 @@ defmodule Electric.ClientTest do fun.(conn) end) + headers = Headers.insert(handle: "my-shape-2") + assert [ %ChangeMessage{ headers: @insert, @@ -669,7 +688,7 @@ defmodule Electric.ClientTest do up_to_date(1, 0), %ControlMessage{control: :must_refetch, offset: offset(1, 0)}, %ChangeMessage{ - headers: @insert, + headers: ^headers, offset: offset(1, 0), value: %{"id" => "1111"} },