diff --git a/lib/realtime/application.ex b/lib/realtime/application.ex index 737e0b433..2b893850a 100644 --- a/lib/realtime/application.ex +++ b/lib/realtime/application.ex @@ -48,7 +48,6 @@ defmodule Realtime.Application do region = Application.get_env(:realtime, :region) :syn.join(RegionNodes, region, self(), node: node()) - :ets.new(:active_tenants, [:named_table, :set, :public]) children = [ diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index 8d549daeb..20814a9c3 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -298,32 +298,6 @@ defmodule Realtime.Tenants do |> tap(fn _ -> Cache.invalidate_tenant_cache(tenant_id) end) end - @doc """ - Tracks the active tenant by external_id and stores the time when it was tracked in the ETS table named `:active_tenants`. - """ - @spec track_active_tenant(String.t()) :: :ok - def track_active_tenant(external_id) do - :ets.insert(:active_tenants, {external_id, NaiveDateTime.utc_now()}) - :ok - end - - @doc """ - Lists all active tenants from the ETS table named `:active_tenants`. - """ - @spec track_active_tenant(String.t()) :: list({String.t(), NaiveDateTime.t()}) - def list_active_tenants() do - :ets.tab2list(:active_tenants) - end - - @doc """ - Untracks the active tenant by external_id from the ETS table named `:active_tenants`. - """ - @spec untrack_active_tenant(String.t()) :: :ok - def untrack_active_tenant(external_id) do - :ets.delete(:active_tenants, external_id) - :ok - end - defp broadcast_operation_event(action, external_id) do Phoenix.PubSub.broadcast!( Realtime.PubSub, diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex index a99ddb246..c4aaa23c4 100644 --- a/lib/realtime/tenants/connect.ex +++ b/lib/realtime/tenants/connect.ex @@ -181,7 +181,6 @@ defmodule Realtime.Tenants.Connect do connected_users_bucket: connected_users_bucket } = state - Tenants.track_active_tenant(state.tenant_id) :ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:invalidate_cache") send_connected_user_check_message(connected_users_bucket, check_connected_user_interval) diff --git a/lib/realtime/tenants/janitor.ex b/lib/realtime/tenants/janitor.ex index 06c5f26c2..c5a649fa5 100644 --- a/lib/realtime/tenants/janitor.ex +++ b/lib/realtime/tenants/janitor.ex @@ -11,8 +11,8 @@ defmodule Realtime.Tenants.Janitor do alias Realtime.Api.Tenant alias Realtime.Database alias Realtime.Messages - alias Realtime.Repo alias Realtime.Tenants + alias Realtime.Tenants.Migrations @type t :: %__MODULE__{ timer: pos_integer() | nil, @@ -52,32 +52,35 @@ defmodule Realtime.Tenants.Janitor do def init(%__MODULE__{start_after: start_after} = state) do timer = timer(state) + start_after Process.send_after(self(), :delete_old_messages, timer) + Logger.info("Janitor started") {:ok, state} end + @table_name :"syn_registry_by_name_Elixir.Realtime.Tenants.Connect" @impl true def handle_info(:delete_old_messages, state) do Logger.info("Janitor started") %{chunks: chunks, tasks: tasks} = state - {:ok, new_tasks} = - Repo.transaction(fn -> - Tenants.list_active_tenants() - |> Stream.map(&elem(&1, 0)) - |> Stream.chunk_every(chunks) - |> Stream.map(fn chunks -> - task = - Task.Supervisor.async_nolink( - __MODULE__.TaskSupervisor, - fn -> run_cleanup_on_tenants(chunks) end, - ordered: false - ) - - {task.ref, chunks} - end) - |> Map.new() + matchspec = [ + {{:"$1", :"$2", :"$3", :"$4", :"$5", Node.self()}, [], [:"$1"]} + ] + + new_tasks = + :ets.select(@table_name, matchspec) + |> Stream.chunk_every(chunks) + |> Stream.map(fn chunks -> + task = + Task.Supervisor.async_nolink( + __MODULE__.TaskSupervisor, + fn -> perform_mantaince_tasks(chunks) end, + ordered: false + ) + + {task.ref, chunks} end) + |> Map.new() Process.send_after(self(), :delete_old_messages, timer(state)) @@ -93,7 +96,7 @@ defmodule Realtime.Tenants.Janitor do def handle_info({:DOWN, ref, _, _, :killed}, state) do %{tasks: tasks} = state - {tenants, tasks} = Map.pop(tasks, ref) + tenants = Map.get(tasks, ref) log_error( "JanitorFailedToDeleteOldMessages", @@ -110,18 +113,19 @@ defmodule Realtime.Tenants.Janitor do defp timer(%{timer: timer, randomize: true}), do: timer + :timer.minutes(Enum.random(1..59)) defp timer(%{timer: timer}), do: timer - defp run_cleanup_on_tenants(tenants), do: Enum.map(tenants, &run_cleanup_on_tenant/1) + defp perform_mantaince_tasks(tenants), do: Enum.map(tenants, &perform_mantaince_task/1) - defp run_cleanup_on_tenant(tenant_external_id) do + defp perform_mantaince_task(tenant_external_id) do Logger.metadata(project: tenant_external_id, external_id: tenant_external_id) Logger.info("Janitor starting realtime.messages cleanup") with %Tenant{} = tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_external_id), {:ok, conn} <- Database.connect(tenant, "realtime_janitor", 1), - :ok <- Messages.delete_old_messages(conn) do + :ok <- Messages.delete_old_messages(conn), + :ok <- Migrations.create_partitions(conn) do Logger.info("Janitor finished") + GenServer.stop(conn) - Tenants.untrack_active_tenant(tenant_external_id) :ok end end diff --git a/test/realtime/tenants/connect_test.exs b/test/realtime/tenants/connect_test.exs index 16ad2fd55..ff910abf2 100644 --- a/test/realtime/tenants/connect_test.exs +++ b/test/realtime/tenants/connect_test.exs @@ -1,6 +1,5 @@ defmodule Realtime.Tenants.ConnectTest do # async: false due to the fact that multiple operations against the database will use the same connection - alias Realtime.Tenants use Realtime.DataCase, async: false import Mock @@ -22,13 +21,6 @@ defmodule Realtime.Tenants.ConnectTest do assert is_pid(db_conn) end - test "on connect, tracks tenant as active", %{tenant: tenant} do - assert {:ok, _} = Connect.lookup_or_start_connection(tenant.external_id) - :timer.sleep(500) - - assert Enum.find(Tenants.list_active_tenants(), &(elem(&1, 0) == tenant.external_id)) - end - test "on database disconnect, returns new connection", %{tenant: tenant} do assert {:ok, old_conn} = Connect.lookup_or_start_connection(tenant.external_id) :timer.sleep(500) diff --git a/test/realtime/tenants/janitor_test.exs b/test/realtime/tenants/janitor_test.exs index 629271b74..e3aea0ecd 100644 --- a/test/realtime/tenants/janitor_test.exs +++ b/test/realtime/tenants/janitor_test.exs @@ -1,16 +1,17 @@ defmodule Realtime.Tenants.JanitorTest do # async: false due to using database process - alias Realtime.Tenants use Realtime.DataCase, async: false + import Mock import ExUnit.CaptureLog alias Realtime.Api.Message alias Realtime.Api.Tenant alias Realtime.Database alias Realtime.Repo - alias Realtime.Tenants.Migrations alias Realtime.Tenants.Janitor + alias Realtime.Tenants.Migrations + alias Realtime.Tenants.Connect setup do dev_tenant = Tenant |> Repo.all() |> hd() @@ -27,13 +28,11 @@ defmodule Realtime.Tenants.JanitorTest do dev_tenant ], fn tenant -> - tenant = Repo.preload(tenant, [:extensions]) - [%{settings: settings} | _] = tenant.extensions - migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings} - Migrations.run_migrations(migrations) + tenant = Repo.preload(tenant, :extensions) + Connect.lookup_or_start_connection(tenant.external_id) + :timer.sleep(250) {:ok, conn} = Database.connect(tenant, "realtime_test", 1) clean_table(conn, "realtime", "messages") - Tenants.track_active_tenant(tenant.external_id) tenant end ) @@ -47,13 +46,15 @@ defmodule Realtime.Tenants.JanitorTest do ) on_exit(fn -> + Enum.each(tenants, &Connect.shutdown(&1.external_id)) + :timer.sleep(10) Application.put_env(:realtime, :janitor_schedule_timer, timer) end) %{tenants: tenants} end - test "cleans messages older than 72 hours from tenants that were active and untracks the user", + test "cleans messages older than 72 hours and creates partitions from tenants that were active and untracks the user", %{ tenants: tenants } do @@ -73,20 +74,22 @@ defmodule Realtime.Tenants.JanitorTest do |> Enum.reject(&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt)) |> MapSet.new() - start_supervised!(Janitor) - Process.sleep(500) - - current = - Enum.map(tenants, fn tenant -> - {:ok, conn} = Database.connect(tenant, "realtime_test", 1) - {:ok, res} = Repo.all(conn, from(m in Message), Message) - res - end) - |> List.flatten() - |> MapSet.new() + with_mock Migrations, create_partitions: fn _ -> :ok end do + start_supervised!(Janitor) + Process.sleep(500) - assert MapSet.difference(current, to_keep) |> MapSet.size() == 0 - assert Tenants.list_active_tenants() == [] + current = + Enum.map(tenants, fn tenant -> + {:ok, conn} = Database.connect(tenant, "realtime_test", 1) + {:ok, res} = Repo.all(conn, from(m in Message), Message) + res + end) + |> List.flatten() + |> MapSet.new() + + assert MapSet.difference(current, to_keep) |> MapSet.size() == 0 + assert_called(Migrations.create_partitions(:_)) + end end test "logs error if fails to connect to tenant" do @@ -109,7 +112,13 @@ defmodule Realtime.Tenants.JanitorTest do ] tenant = tenant_fixture(%{extensions: extensions}) - Tenants.track_active_tenant(tenant.external_id) + # Force add a bad tenant + :ets.insert( + :"syn_registry_by_name_Elixir.Realtime.Tenants.Connect", + {tenant.external_id, :undefined, :undefined, :undefined, :undefined, Node.self()} + ) + + :timer.sleep(250) assert capture_log(fn -> start_supervised!(Janitor) diff --git a/test/support/data_case.ex b/test/support/data_case.ex index 2d0b8a692..ee156aee6 100644 --- a/test/support/data_case.ex +++ b/test/support/data_case.ex @@ -35,8 +35,6 @@ defmodule Realtime.DataCase do Sandbox.mode(Realtime.Repo, {:shared, self()}) end - :ets.match_delete(:active_tenants, :_) - {:ok, conn: Phoenix.ConnTest.build_conn()} end