From 0d9a3abc8580d6b4808a5c984569e89e6c697404 Mon Sep 17 00:00:00 2001 From: Ilia Borovitinov Date: Wed, 11 Dec 2024 12:40:02 +0300 Subject: [PATCH] fix: truncates no longer crash replication --- .changeset/gorgeous-bottles-mate.md | 5 ++ .../sync-service/lib/electric/shape_cache.ex | 23 +++--- .../lib/electric/shapes/consumer.ex | 6 +- .../test/electric/plug/router_test.exs | 76 +++++++++++++++++++ 4 files changed, 95 insertions(+), 15 deletions(-) create mode 100644 .changeset/gorgeous-bottles-mate.md diff --git a/.changeset/gorgeous-bottles-mate.md b/.changeset/gorgeous-bottles-mate.md new file mode 100644 index 0000000000..48e567ce3d --- /dev/null +++ b/.changeset/gorgeous-bottles-mate.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +fix: truncates no longer cause a stop to an incoming replication stream diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index 44d08bf816..ac118f904b 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -158,7 +158,7 @@ defmodule Electric.ShapeCache do @spec handle_truncate(shape_handle(), keyword()) :: :ok def handle_truncate(shape_handle, opts \\ []) do server = Access.get(opts, :server, name(opts)) - GenStage.call(server, {:truncate, shape_handle}) + GenStage.cast(server, {:truncate, shape_handle}) end @impl Electric.ShapeCacheBehaviour @@ -276,16 +276,6 @@ defmodule Electric.ShapeCache do state} end - def handle_call({:truncate, shape_handle}, _from, state) do - with :ok <- clean_up_shape(state, shape_handle) do - Logger.info( - "Truncating and rotating shape handle, previous shape handle #{shape_handle} cleaned up" - ) - end - - {:reply, :ok, state} - end - def handle_call({:clean, shape_handle}, _from, state) do # ignore errors when cleaning up non-existant shape id with :ok <- clean_up_shape(state, shape_handle) do @@ -301,6 +291,17 @@ defmodule Electric.ShapeCache do {:reply, :ok, state} end + @impl GenServer + def handle_cast({:truncate, shape_handle}, state) do + with :ok <- clean_up_shape(state, shape_handle) do + Logger.info( + "Truncating and rotating shape handle, previous shape handle #{shape_handle} cleaned up" + ) + end + + {:noreply, state} + end + defp clean_up_shape(state, shape_handle) do Electric.Shapes.DynamicConsumerSupervisor.stop_shape_consumer( state.consumer_supervisor, diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 8a4efc5263..9117ae680c 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -226,7 +226,7 @@ defmodule Electric.Shapes.Consumer do defp handle_txns(txns, state) do case Enum.reduce_while(txns, state, &handle_txn/2) do {:truncate, state} -> - {:stop, {:shutdown, :truncate}, state} + {:stop, :normal, state} state -> {:noreply, [], state} @@ -283,9 +283,7 @@ defmodule Electric.Shapes.Consumer do "Truncate operation encountered while processing txn #{txn.xid} for #{shape_handle}" ) - :ok = shape_cache.handle_truncate(shape_handle, shape_cache_opts) - - :ok = ShapeCache.Storage.cleanup!(storage) + cleanup(state) {:halt, {:truncate, notify(txn, %{state | log_state: @initial_log_state})}} diff --git a/packages/sync-service/test/electric/plug/router_test.exs b/packages/sync-service/test/electric/plug/router_test.exs index f21ccaaeb6..4b0589aafc 100644 --- a/packages/sync-service/test/electric/plug/router_test.exs +++ b/packages/sync-service/test/electric/plug/router_test.exs @@ -832,6 +832,82 @@ defmodule Electric.Plug.RouterTest do [^shape_handle] = Plug.Conn.get_resp_header(conn, "electric-handle") end + @tag with_sql: [ + "INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')" + ] + test "GET returns a 409 on a truncate and can follow a new shape afterwards", %{ + opts: opts, + db_conn: db_conn + } do + conn = Router.call(conn("GET", "/v1/shape?table=items&offset=-1"), opts) + + assert %{status: 200} = conn + handle = get_resp_shape_handle(conn) + offset = get_resp_last_offset(conn) + assert [%{"value" => %{"value" => "test value 1"}}] = Jason.decode!(conn.resp_body) + + task = + Task.async(fn -> + Router.call( + conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{handle}&live"), + opts + ) + end) + + Postgrex.query!(db_conn, "INSERT INTO items VALUES (gen_random_uuid(), 'test value 2')", []) + + conn = Task.await(task) + + assert %{status: 200} = conn + assert ^handle = get_resp_shape_handle(conn) + offset = get_resp_last_offset(conn) + assert [%{"value" => %{"value" => "test value 2"}}, _] = Jason.decode!(conn.resp_body) + + task = + Task.async(fn -> + Router.call( + conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{handle}&live"), + opts + ) + end) + + Postgrex.query!(db_conn, "TRUNCATE TABLE items", []) + assert %{status: 204} = Task.await(task) + + conn = + Router.call(conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{handle}"), opts) + + assert %{status: 409} = conn + assert [%{"headers" => %{"control" => "must-refetch"}}] = Jason.decode!(conn.resp_body) + + conn = + Router.call(conn("GET", "/v1/shape?table=items&offset=-1"), opts) + + assert %{status: 200} = conn + new_handle = get_resp_shape_handle(conn) + refute new_handle == handle + offset = get_resp_last_offset(conn) + assert [] = Jason.decode!(conn.resp_body) + + task = + Task.async(fn -> + Router.call( + conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{new_handle}&live"), + opts + ) + end) + + Postgrex.query!(db_conn, "INSERT INTO items VALUES (gen_random_uuid(), 'test value 3')", []) + + conn = Task.await(task) + + assert %{status: 200} = conn + assert ^new_handle = get_resp_shape_handle(conn) + # offset = get_resp_last_offset(conn) + assert [%{"value" => %{"value" => "test value 3"}}, @up_to_date] = + Jason.decode!(conn.resp_body) + end + @tag with_sql: [ "INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')" ]