From 25e2da18a468b89e97b5835cf4f4c046550af844 Mon Sep 17 00:00:00 2001 From: Ilia Borovitinov Date: Thu, 12 Dec 2024 13:30:32 +0300 Subject: [PATCH] removed `handle_truncate/1` from ShapeCache --- .../sync-service/lib/electric/shape_cache.ex | 19 ------- .../test/electric/shape_cache_test.exs | 55 ------------------- 2 files changed, 74 deletions(-) diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index ac118f904b..88c4febde1 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -18,7 +18,6 @@ defmodule Electric.ShapeCacheBehaviour do {shape_handle(), current_snapshot_offset :: LogOffset.t()} @callback list_shapes(keyword() | map()) :: [{shape_handle(), Shape.t()}] @callback await_snapshot_start(shape_handle(), opts :: keyword()) :: :started | {:error, term()} - @callback handle_truncate(shape_handle(), keyword()) :: :ok @callback clean_shape(shape_handle(), keyword()) :: :ok @callback clean_all_shapes(keyword()) :: :ok @callback has_shape?(shape_handle(), keyword()) :: boolean() @@ -154,13 +153,6 @@ defmodule Electric.ShapeCache do GenServer.call(server, {:clean_all}) end - @impl Electric.ShapeCacheBehaviour - @spec handle_truncate(shape_handle(), keyword()) :: :ok - def handle_truncate(shape_handle, opts \\ []) do - server = Access.get(opts, :server, name(opts)) - GenStage.cast(server, {:truncate, shape_handle}) - end - @impl Electric.ShapeCacheBehaviour @spec await_snapshot_start(shape_handle(), keyword()) :: :started | {:error, term()} def await_snapshot_start(shape_handle, opts \\ []) when is_binary(shape_handle) do @@ -291,17 +283,6 @@ defmodule Electric.ShapeCache do {:reply, :ok, state} end - @impl GenServer - def handle_cast({:truncate, shape_handle}, state) do - with :ok <- clean_up_shape(state, shape_handle) do - Logger.info( - "Truncating and rotating shape handle, previous shape handle #{shape_handle} cleaned up" - ) - end - - {:noreply, state} - end - defp clean_up_shape(state, shape_handle) do Electric.Shapes.DynamicConsumerSupervisor.stop_shape_consumer( state.consumer_supervisor, diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index 2e6b9411cc..9a6ed196a9 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -679,61 +679,6 @@ defmodule Electric.ShapeCacheTest do end end - describe "handle_truncate/2" do - setup [ - :with_stack_id_from_test, - :with_in_memory_storage, - :with_log_chunking, - :with_registry, - :with_shape_log_collector - ] - - test "cleans up shape data and rotates the shape handle", ctx do - %{shape_cache_opts: opts} = - with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), - run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, - create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> - GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 10}) - Storage.make_new_snapshot!([["test"]], storage) - GenServer.cast(parent, {:snapshot_started, shape_handle}) - end - ) - - {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape, opts) - Process.sleep(50) - assert :started = ShapeCache.await_snapshot_start(shape_handle, opts) - - storage = Storage.for_shape(shape_handle, ctx.storage) - - Storage.append_to_log!( - changes_to_log_items([ - %Electric.Replication.Changes.NewRecord{ - relation: {"public", "items"}, - record: %{"id" => "1", "value" => "Alice"}, - log_offset: LogOffset.new(Electric.Postgres.Lsn.from_integer(1000), 0) - } - ]), - storage - ) - - assert Storage.snapshot_started?(storage) - assert Enum.count(Storage.get_log_stream(@zero_offset, storage)) == 1 - - ref = - Shapes.Consumer.whereis(ctx.stack_id, shape_handle) - |> Process.monitor() - - log = capture_log(fn -> ShapeCache.handle_truncate(shape_handle, opts) end) - assert log =~ "Truncating and rotating shape handle" - - assert_receive {:DOWN, ^ref, :process, _pid, _} - # Wait a bit for the async cleanup to complete - - refute Storage.snapshot_started?(storage) - end - end - describe "clean_shape/2" do setup [ :with_stack_id_from_test,