Skip to content

Commit

Permalink
removed handle_truncate/1 from ShapeCache
Browse files Browse the repository at this point in the history
  • Loading branch information
icehaunter committed Dec 12, 2024
1 parent d34f896 commit 25e2da1
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 74 deletions.
19 changes: 0 additions & 19 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ defmodule Electric.ShapeCacheBehaviour do
{shape_handle(), current_snapshot_offset :: LogOffset.t()}
@callback list_shapes(keyword() | map()) :: [{shape_handle(), Shape.t()}]
@callback await_snapshot_start(shape_handle(), opts :: keyword()) :: :started | {:error, term()}
@callback handle_truncate(shape_handle(), keyword()) :: :ok
@callback clean_shape(shape_handle(), keyword()) :: :ok
@callback clean_all_shapes(keyword()) :: :ok
@callback has_shape?(shape_handle(), keyword()) :: boolean()
Expand Down Expand Up @@ -154,13 +153,6 @@ defmodule Electric.ShapeCache do
GenServer.call(server, {:clean_all})
end

@impl Electric.ShapeCacheBehaviour
@spec handle_truncate(shape_handle(), keyword()) :: :ok
def handle_truncate(shape_handle, opts \\ []) do
server = Access.get(opts, :server, name(opts))
GenStage.cast(server, {:truncate, shape_handle})
end

@impl Electric.ShapeCacheBehaviour
@spec await_snapshot_start(shape_handle(), keyword()) :: :started | {:error, term()}
def await_snapshot_start(shape_handle, opts \\ []) when is_binary(shape_handle) do
Expand Down Expand Up @@ -291,17 +283,6 @@ defmodule Electric.ShapeCache do
{:reply, :ok, state}
end

@impl GenServer
def handle_cast({:truncate, shape_handle}, state) do
with :ok <- clean_up_shape(state, shape_handle) do
Logger.info(
"Truncating and rotating shape handle, previous shape handle #{shape_handle} cleaned up"
)
end

{:noreply, state}
end

defp clean_up_shape(state, shape_handle) do
Electric.Shapes.DynamicConsumerSupervisor.stop_shape_consumer(
state.consumer_supervisor,
Expand Down
55 changes: 0 additions & 55 deletions packages/sync-service/test/electric/shape_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -679,61 +679,6 @@ defmodule Electric.ShapeCacheTest do
end
end

describe "handle_truncate/2" do
setup [
:with_stack_id_from_test,
:with_in_memory_storage,
:with_log_chunking,
:with_registry,
:with_shape_log_collector
]

test "cleans up shape data and rotates the shape handle", ctx do
%{shape_cache_opts: opts} =
with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}),
run_with_conn_fn: &run_with_conn_noop/2,
prepare_tables_fn: @prepare_tables_noop,
create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ ->
GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 10})
Storage.make_new_snapshot!([["test"]], storage)
GenServer.cast(parent, {:snapshot_started, shape_handle})
end
)

{shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape, opts)
Process.sleep(50)
assert :started = ShapeCache.await_snapshot_start(shape_handle, opts)

storage = Storage.for_shape(shape_handle, ctx.storage)

Storage.append_to_log!(
changes_to_log_items([
%Electric.Replication.Changes.NewRecord{
relation: {"public", "items"},
record: %{"id" => "1", "value" => "Alice"},
log_offset: LogOffset.new(Electric.Postgres.Lsn.from_integer(1000), 0)
}
]),
storage
)

assert Storage.snapshot_started?(storage)
assert Enum.count(Storage.get_log_stream(@zero_offset, storage)) == 1

ref =
Shapes.Consumer.whereis(ctx.stack_id, shape_handle)
|> Process.monitor()

log = capture_log(fn -> ShapeCache.handle_truncate(shape_handle, opts) end)
assert log =~ "Truncating and rotating shape handle"

assert_receive {:DOWN, ^ref, :process, _pid, _}
# Wait a bit for the async cleanup to complete

refute Storage.snapshot_started?(storage)
end
end

describe "clean_shape/2" do
setup [
:with_stack_id_from_test,
Expand Down

0 comments on commit 25e2da1

Please sign in to comment.