Skip to content

Commit

Permalink
fix: truncates no longer crash replication
Browse files Browse the repository at this point in the history
  • Loading branch information
icehaunter committed Dec 12, 2024
1 parent 9522baa commit 0d9a3ab
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 15 deletions.
5 changes: 5 additions & 0 deletions .changeset/gorgeous-bottles-mate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

fix: truncates no longer cause a stop to an incoming replication stream
23 changes: 12 additions & 11 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ defmodule Electric.ShapeCache do
@spec handle_truncate(shape_handle(), keyword()) :: :ok
def handle_truncate(shape_handle, opts \\ []) do
server = Access.get(opts, :server, name(opts))
GenStage.call(server, {:truncate, shape_handle})
GenStage.cast(server, {:truncate, shape_handle})
end

@impl Electric.ShapeCacheBehaviour
Expand Down Expand Up @@ -276,16 +276,6 @@ defmodule Electric.ShapeCache do
state}
end

def handle_call({:truncate, shape_handle}, _from, state) do
with :ok <- clean_up_shape(state, shape_handle) do
Logger.info(
"Truncating and rotating shape handle, previous shape handle #{shape_handle} cleaned up"
)
end

{:reply, :ok, state}
end

def handle_call({:clean, shape_handle}, _from, state) do
# ignore errors when cleaning up non-existant shape id
with :ok <- clean_up_shape(state, shape_handle) do
Expand All @@ -301,6 +291,17 @@ defmodule Electric.ShapeCache do
{:reply, :ok, state}
end

@impl GenServer
def handle_cast({:truncate, shape_handle}, state) do
with :ok <- clean_up_shape(state, shape_handle) do
Logger.info(
"Truncating and rotating shape handle, previous shape handle #{shape_handle} cleaned up"
)
end

{:noreply, state}
end

defp clean_up_shape(state, shape_handle) do
Electric.Shapes.DynamicConsumerSupervisor.stop_shape_consumer(
state.consumer_supervisor,
Expand Down
6 changes: 2 additions & 4 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ defmodule Electric.Shapes.Consumer do
defp handle_txns(txns, state) do
case Enum.reduce_while(txns, state, &handle_txn/2) do
{:truncate, state} ->
{:stop, {:shutdown, :truncate}, state}
{:stop, :normal, state}

state ->
{:noreply, [], state}
Expand Down Expand Up @@ -283,9 +283,7 @@ defmodule Electric.Shapes.Consumer do
"Truncate operation encountered while processing txn #{txn.xid} for #{shape_handle}"
)

:ok = shape_cache.handle_truncate(shape_handle, shape_cache_opts)

:ok = ShapeCache.Storage.cleanup!(storage)
cleanup(state)

{:halt, {:truncate, notify(txn, %{state | log_state: @initial_log_state})}}

Expand Down
76 changes: 76 additions & 0 deletions packages/sync-service/test/electric/plug/router_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,82 @@ defmodule Electric.Plug.RouterTest do
[^shape_handle] = Plug.Conn.get_resp_header(conn, "electric-handle")
end

@tag with_sql: [
"INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')"
]
test "GET returns a 409 on a truncate and can follow a new shape afterwards", %{
opts: opts,
db_conn: db_conn
} do
conn = Router.call(conn("GET", "/v1/shape?table=items&offset=-1"), opts)

assert %{status: 200} = conn
handle = get_resp_shape_handle(conn)
offset = get_resp_last_offset(conn)
assert [%{"value" => %{"value" => "test value 1"}}] = Jason.decode!(conn.resp_body)

task =
Task.async(fn ->
Router.call(
conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{handle}&live"),
opts
)
end)

Postgrex.query!(db_conn, "INSERT INTO items VALUES (gen_random_uuid(), 'test value 2')", [])

conn = Task.await(task)

assert %{status: 200} = conn
assert ^handle = get_resp_shape_handle(conn)
offset = get_resp_last_offset(conn)
assert [%{"value" => %{"value" => "test value 2"}}, _] = Jason.decode!(conn.resp_body)

task =
Task.async(fn ->
Router.call(
conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{handle}&live"),
opts
)
end)

Postgrex.query!(db_conn, "TRUNCATE TABLE items", [])
assert %{status: 204} = Task.await(task)

conn =
Router.call(conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{handle}"), opts)

assert %{status: 409} = conn
assert [%{"headers" => %{"control" => "must-refetch"}}] = Jason.decode!(conn.resp_body)

conn =
Router.call(conn("GET", "/v1/shape?table=items&offset=-1"), opts)

assert %{status: 200} = conn
new_handle = get_resp_shape_handle(conn)
refute new_handle == handle
offset = get_resp_last_offset(conn)
assert [] = Jason.decode!(conn.resp_body)

task =
Task.async(fn ->
Router.call(
conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{new_handle}&live"),
opts
)
end)

Postgrex.query!(db_conn, "INSERT INTO items VALUES (gen_random_uuid(), 'test value 3')", [])

conn = Task.await(task)

assert %{status: 200} = conn
assert ^new_handle = get_resp_shape_handle(conn)
# offset = get_resp_last_offset(conn)
assert [%{"value" => %{"value" => "test value 3"}}, @up_to_date] =
Jason.decode!(conn.resp_body)
end

@tag with_sql: [
"INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')"
]
Expand Down

0 comments on commit 0d9a3ab

Please sign in to comment.