diff --git a/.github/workflows/staging_linter.yml b/.github/workflows/staging_linter.yml index a44ac79ea..939e7c174 100644 --- a/.github/workflows/staging_linter.yml +++ b/.github/workflows/staging_linter.yml @@ -49,5 +49,7 @@ jobs: mix dialyzer.build - name: Run dialyzer run: mix dialyzer + - name: Start epmd + run: epmd -daemon - name: Run tests - run: mix test --trace + run: mix test --trace --max-cases 1 diff --git a/README.md b/README.md index 2658ae6c6..6dbc29283 100644 --- a/README.md +++ b/README.md @@ -148,6 +148,10 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000 | TENANT_MAX_EVENTS_PER_SECOND | string | The default value of maximum events per second that each tenant can support, used when creating a tenant for the first time. Defaults to '100'. | | TENANT_MAX_JOINS_PER_SECOND | string | The default value of maximum channel joins per second that each tenant can support, used when creating a tenant for the first time. Defaults to '100'. | | SEED_SELF_HOST | boolean | Seeds the system with default tenant | +| RUN_JANITOR_AFTER_IN_MS | number | Tells system when to start janitor tasks after boot | +| RUN_JANITOR | boolean | Do you want to janitor tasks to run | +| MAX_CHILDREN_JANITOR_CLEANUP | number | Maximum number of concurrent tasks working on janitor cleanup | +| JANITOR_CLEANUP_TASK_TIMEOUT | number | Timeout for each async task for janitor cleanup | ## WebSocket URL @@ -218,6 +222,7 @@ This is the list of operational codes that can help you understand your deployme | ErrorOnRpcCall | Error when calling another realtime node | | ErrorExecutingTransaction | Error executing a database transaction in tenant database | | SynInitializationError | Our framework to syncronize processes has failed to properly startup a connection to the database | +| JanitorFailedToDeleteOldMessages | Scheduled task for realtime.message cleanup was unable to run | | UnknownError | An unknown error occurred | ## License diff --git a/config/runtime.exs b/config/runtime.exs index 61bc2fb9c..8c25ad4ae 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -24,6 +24,23 @@ config :realtime, System.get_env("TENANT_MAX_JOINS_PER_SECOND", "100") |> String.to_integer(), rpc_timeout: System.get_env("RPC_TIMEOUT", "30000") |> String.to_integer() +if config_env() == :test do + config :realtime, run_scheduled: false +else + config :realtime, + run_scheduled: System.get_env("RUN_SCHEDULED", "true") == "true", + scheduled_randomize: System.get_env("RUN_SCHEDULED", "true") == "true", + # defaults the runner to only start after 10 minutes + scheduled_start_after: + System.get_env("RUN_SCHEDULED_AFTER_IN_MS", "600000") |> String.to_integer() +end + +config :realtime, + max_children_scheduled_cleanup: + System.get_env("MAX_CHILDREN_SCHEDULED_CLEANUP", "5") |> String.to_integer(), + scheduled_cleanup_task_timeout: + System.get_env("SCHEDULED_CLEANUP_TASK_TIMEOUT", "5000") |> String.to_integer() + if config_env() == :prod do secret_key_base = System.get_env("SECRET_KEY_BASE") || diff --git a/lib/realtime/api/message.ex b/lib/realtime/api/message.ex index 916614a1f..6cc437d7b 100644 --- a/lib/realtime/api/message.ex +++ b/lib/realtime/api/message.ex @@ -8,19 +8,28 @@ defmodule Realtime.Api.Message do @schema_prefix "realtime" schema "messages" do - field :uuid, :string - field :topic, :string - field :extension, Ecto.Enum, values: [:broadcast, :presence] - field :payload, :map - field :event, :string - field :private, :boolean + field(:uuid, :string) + field(:topic, :string) + field(:extension, Ecto.Enum, values: [:broadcast, :presence]) + field(:payload, :map) + field(:event, :string) + field(:private, :boolean) timestamps() end def changeset(message, attrs) do message - |> cast(attrs, [:topic, :extension, :payload, :event, :private, :uuid]) + |> cast(attrs, [ + :topic, + :extension, + :payload, + :event, + :private, + :inserted_at, + :updated_at, + :uuid + ]) |> validate_required([:topic, :extension]) |> put_timestamp(:updated_at) |> maybe_put_timestamp(:inserted_at) @@ -31,7 +40,7 @@ defmodule Realtime.Api.Message do end defp maybe_put_timestamp(changeset, field) do - case Map.get(changeset.data, field, nil) do + case Map.get(changeset.changes, field) do nil -> put_timestamp(changeset, field) _ -> changeset end diff --git a/lib/realtime/application.ex b/lib/realtime/application.ex index 0c1d92862..87ecc15cd 100644 --- a/lib/realtime/application.ex +++ b/lib/realtime/application.ex @@ -12,6 +12,12 @@ defmodule Realtime.Application do def start(_type, _args) do primary_config = :logger.get_primary_config() + max_children_scheduled_cleanup = + Application.get_env(:realtime, :max_children_scheduled_cleanup) + + scheduled_cleanup_task_timeout = + Application.get_env(:realtime, :scheduled_cleanup_task_timeout) + # add the region to logs :ok = :logger.set_primary_config( @@ -66,6 +72,11 @@ defmodule Realtime.Application do {Registry, keys: :duplicate, name: Realtime.Registry}, {Registry, keys: :unique, name: Realtime.Registry.Unique}, {Task.Supervisor, name: Realtime.TaskSupervisor}, + {Task.Supervisor, + name: Realtime.Tenants.ScheduledMessageCleanup.TaskSupervisor, + max_children: max_children_scheduled_cleanup, + max_seconds: scheduled_cleanup_task_timeout, + max_restarts: 1}, {PartitionSupervisor, child_spec: DynamicSupervisor, strategy: :one_for_one, @@ -83,7 +94,7 @@ defmodule Realtime.Application do name: Realtime.BroadcastChanges.Handler.DynamicSupervisor}, RealtimeWeb.Endpoint, RealtimeWeb.Presence - ] ++ extensions_supervisors() + ] ++ extensions_supervisors() ++ scheduled_tasks() children = case Replica.replica() do @@ -112,4 +123,10 @@ defmodule Realtime.Application do acc end) end + + defp scheduled_tasks() do + if Application.fetch_env!(:realtime, :run_scheduled), + do: [Realtime.Tenants.ScheduledMessageCleanup], + else: [] + end end diff --git a/lib/realtime/database.ex b/lib/realtime/database.ex index 32b04a365..c4a763a9b 100644 --- a/lib/realtime/database.ex +++ b/lib/realtime/database.ex @@ -309,9 +309,7 @@ defmodule Realtime.Database do username: user, pool_size: pool, queue_target: queue_target, - parameters: [ - application_name: application_name - ], + parameters: [application_name: application_name], socket_options: [addrtype], backoff_type: backoff_type, configure: fn args -> diff --git a/lib/realtime/messages.ex b/lib/realtime/messages.ex new file mode 100644 index 000000000..2c102c8f3 --- /dev/null +++ b/lib/realtime/messages.ex @@ -0,0 +1,18 @@ +defmodule Realtime.Messages do + @moduledoc """ + Handles `realtime.messages` table operations + """ + import Ecto.Query + alias Realtime.Repo + alias Realtime.Api.Message + + @doc """ + Deletes messages older than 72 hours for a given tenant connection + """ + @spec delete_old_messages(pid()) :: {:ok, any()} | {:error, any()} + def delete_old_messages(conn) do + limit = NaiveDateTime.utc_now() |> NaiveDateTime.add(-72, :hour) + query = from m in Message, where: m.inserted_at <= ^limit + Repo.del(conn, query) + end +end diff --git a/lib/realtime/nodes.ex b/lib/realtime/nodes.ex index ce9855fcf..3d3c3cff4 100644 --- a/lib/realtime/nodes.ex +++ b/lib/realtime/nodes.ex @@ -5,6 +5,46 @@ defmodule Realtime.Nodes do require Logger alias Realtime.Api.Tenant + @mapping_realtime_region_to_tenant_region_aws %{ + "ap-southeast-1" => [ + "ap-east-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-south-1", + "ap-southeast-1" + ], + "ap-southeast-2" => ["ap-southeast-2"], + "eu-west-2" => [ + "eu-central-1", + "eu-central-2", + "eu-north-1", + "eu-west-1", + "eu-west-2", + "eu-west-3" + ], + "us-east-1" => [ + "ca-central-1", + "sa-east-1", + "us-east-1", + "us-east-2" + ], + "us-west-1" => ["us-west-1", "us-west-2"] + } + + @mapping_realtime_region_to_tenant_region_fly %{ + "iad" => ["ca-central-1", "sa-east-1", "us-east-1"], + "lhr" => ["eu-central-1", "eu-west-1", "eu-west-2", "eu-west-3"], + "sea" => ["us-west-1"], + "syd" => [ + "ap-east-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-south-1", + "ap-southeast-1", + "ap-southeast-2" + ] + } + @doc """ Gets the node to launch the Postgres connection on for a tenant. """ @@ -35,52 +75,24 @@ defmodule Realtime.Nodes do region_mapping(platform, tenant_region) end - defp region_mapping(:aws, tenant_region) do - case tenant_region do - "ap-east-1" -> "ap-southeast-1" - "ap-northeast-1" -> "ap-southeast-1" - "ap-northeast-2" -> "ap-southeast-1" - "ap-south-1" -> "ap-southeast-1" - "ap-southeast-1" -> "ap-southeast-1" - "ap-southeast-2" -> "ap-southeast-2" - "ca-central-1" -> "us-east-1" - "eu-central-1" -> "eu-west-2" - "eu-central-2" -> "eu-west-2" - "eu-north-1" -> "eu-west-2" - "eu-west-1" -> "eu-west-2" - "eu-west-2" -> "eu-west-2" - "eu-west-3" -> "eu-west-2" - "sa-east-1" -> "us-east-1" - "us-east-1" -> "us-east-1" - "us-east-2" -> "us-east-1" - "us-west-1" -> "us-west-1" - "us-west-2" -> "us-west-1" - _ -> nil - end + defp region_mapping(nil, tenant_region), do: tenant_region + + defp region_mapping(platform, tenant_region) do + mappings = + case platform do + :aws -> @mapping_realtime_region_to_tenant_region_aws + :fly -> @mapping_realtime_region_to_tenant_region_fly + _ -> [] + end + + mappings + |> Enum.flat_map(fn {realtime_region, tenant_regions} -> + Enum.map(tenant_regions, fn tenant_region -> {tenant_region, realtime_region} end) + end) + |> Map.new() + |> Map.get(tenant_region) end - defp region_mapping(:fly, tenant_region) do - case tenant_region do - "us-east-1" -> "iad" - "us-west-1" -> "sea" - "sa-east-1" -> "iad" - "ca-central-1" -> "iad" - "ap-southeast-1" -> "syd" - "ap-northeast-1" -> "syd" - "ap-northeast-2" -> "syd" - "ap-southeast-2" -> "syd" - "ap-east-1" -> "syd" - "ap-south-1" -> "syd" - "eu-west-1" -> "lhr" - "eu-west-2" -> "lhr" - "eu-west-3" -> "lhr" - "eu-central-1" -> "lhr" - _ -> nil - end - end - - defp region_mapping(_, tenant_region), do: tenant_region - @doc """ Lists the nodes in a region. Sorts by node name in case the list order is unstable. @@ -161,4 +173,21 @@ defmodule Realtime.Nodes do host end end + + @doc """ + Fetches the tenant regions for a given realtime reagion + """ + @spec region_to_tenant_regions(String.t()) :: list() | nil + def region_to_tenant_regions(region) do + platform = Application.get_env(:realtime, :platform) + + mappings = + case platform do + :aws -> @mapping_realtime_region_to_tenant_region_aws + :fly -> @mapping_realtime_region_to_tenant_region_fly + _ -> %{} + end + + Map.get(mappings, region) + end end diff --git a/lib/realtime/tenants/connect/check_connection.ex b/lib/realtime/tenants/connect/check_connection.ex index 1c07472fb..1b6bcb5d8 100644 --- a/lib/realtime/tenants/connect/check_connection.ex +++ b/lib/realtime/tenants/connect/check_connection.ex @@ -7,7 +7,6 @@ defmodule Realtime.Tenants.Connect.CheckConnection do @application_name "realtime_connect" @behaviour Realtime.Tenants.Connect.Piper - @impl true def run(acc) do %{tenant_id: tenant_id} = acc diff --git a/lib/realtime/tenants/scheduled_message_cleanup.ex b/lib/realtime/tenants/scheduled_message_cleanup.ex new file mode 100644 index 000000000..dcfabbcbe --- /dev/null +++ b/lib/realtime/tenants/scheduled_message_cleanup.ex @@ -0,0 +1,151 @@ +defmodule Realtime.Tenants.ScheduledMessageCleanup do + @moduledoc """ + Scheduled tasks for the Tenants. + """ + + use GenServer + require Logger + + import Ecto.Query + import Realtime.Helpers, only: [log_error: 2] + + alias Realtime.Api.Tenant + alias Realtime.Database + alias Realtime.Messages + alias Realtime.Nodes + alias Realtime.Repo + + @type t :: %__MODULE__{ + timer: pos_integer() | nil, + region: String.t() | nil, + chunks: pos_integer() | nil, + start_after: pos_integer() | nil, + randomize: boolean() | nil, + tasks: map() + } + + defstruct timer: nil, + region: nil, + chunks: nil, + start_after: nil, + randomize: nil, + tasks: %{} + + def start_link(_args) do + timer = Application.get_env(:realtime, :schedule_clean, :timer.hours(4)) + start_after = Application.get_env(:realtime, :scheduled_start_after, 0) + region = Application.get_env(:realtime, :region) + chunks = Application.get_env(:realtime, :chunks, 10) + randomize = Application.get_env(:realtime, :scheduled_randomize, true) + + state = %__MODULE__{ + timer: timer, + region: region, + chunks: chunks, + start_after: start_after, + randomize: randomize + } + + GenServer.start_link(__MODULE__, state, name: __MODULE__) + end + + @impl true + def init(%__MODULE__{start_after: start_after} = state) do + timer = timer(state) + start_after + Process.send_after(self(), :delete_old_messages, timer) + Logger.info("ScheduledMessageCleanup started") + {:ok, state} + end + + @impl true + def handle_info(:delete_old_messages, state) do + Logger.info("ScheduledMessageCleanup started") + %{region: region, chunks: chunks, tasks: tasks} = state + regions = Nodes.region_to_tenant_regions(region) + region_nodes = Nodes.region_nodes(region) + + query = + from(t in Tenant, + join: e in assoc(t, :extensions), + where: t.notify_private_alpha == true, + preload: :extensions + ) + + new_tasks = + query + |> where_region(regions) + |> Repo.all() + |> Stream.filter(&node_responsible_for_cleanup?(&1, region_nodes)) + |> Stream.chunk_every(chunks) + |> Enum.map(fn chunks -> + task = + Task.Supervisor.async_nolink( + __MODULE__.TaskSupervisor, + fn -> run_cleanup_on_tenants(chunks) end, + ordered: false + ) + + {task.ref, Enum.map(chunks, & &1.external_id)} + end) + |> Map.new() + + Process.send_after(self(), :delete_old_messages, timer(state)) + + {:noreply, %{state | tasks: Map.merge(tasks, new_tasks)}} + end + + def handle_info({:DOWN, ref, _, _, :normal}, state) do + %{tasks: tasks} = state + {_, tasks} = Map.pop(tasks, ref) + {:noreply, %{state | tasks: tasks}} + end + + def handle_info({:DOWN, ref, _, _, :killed}, state) do + %{tasks: tasks} = state + {tenants, tasks} = Map.pop(tasks, ref) + + log_error( + "JanitorFailedToDeleteOldMessages", + "Scheduled cleanup failed for tenants: #{inspect(tenants)}" + ) + + {:noreply, %{state | tasks: tasks}} + end + + def handle_info(_, state) do + {:noreply, state} + end + + defp where_region(query, nil), do: query + + defp where_region(query, regions) do + where(query, [t, e], fragment("? -> 'region' in (?)", e.settings, splice(^regions))) + end + + defp timer(%{timer: timer, randomize: true}), do: timer + :timer.minutes(Enum.random(1..59)) + defp timer(%{timer: timer}), do: timer + + defp node_responsible_for_cleanup?(%Tenant{external_id: external_id}, region_nodes) do + case Node.self() do + :nonode@nohost -> + true + + _ -> + index = :erlang.phash2(external_id, length(region_nodes)) + Enum.at(region_nodes, index) == Node.self() + end + end + + defp run_cleanup_on_tenants(tenants), do: Enum.map(tenants, &run_cleanup_on_tenant/1) + + defp run_cleanup_on_tenant(tenant) do + Logger.metadata(project: tenant.external_id, external_id: tenant.external_id) + Logger.info("ScheduledMessageCleanup cleaned realtime.messages") + + with {:ok, conn} <- Database.connect(tenant, "realtime_janitor", 1), + {:ok, _} <- Messages.delete_old_messages(conn) do + Logger.info("ScheduledMessageCleanup finished") + :ok + end + end +end diff --git a/mix.exs b/mix.exs index dda42aae1..c67ac3ed8 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.33.10", + version: "2.33.11", elixir: "~> 1.16.0", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/e2e/tests.ts b/test/e2e/tests.ts index 54bf69841..e286f2bad 100644 --- a/test/e2e/tests.ts +++ b/test/e2e/tests.ts @@ -303,39 +303,25 @@ describe("broadcast changes", () => { "filipe@supabase.io", "test_test" ); - console.log(accessToken); await supabase.realtime.setAuth(accessToken); const channel = supabase .channel(`event:${id}`, { config: { ...config, private: true } }) - .on("broadcast", { event: "INSERT" }, (res) => { - console.log(res); - insertResult = res; - }) - .on("broadcast", { event: "DELETE" }, (res) => { - console.log(res); - deleteResult = res; - }) - .on("broadcast", { event: "UPDATE" }, (res) => { - console.log(res); - updateResult = res; - }) - .subscribe(async (status, err) => { - console.log({ status, err }); - + .on("broadcast", { event: "INSERT" }, (res) => (insertResult = res)) + .on("broadcast", { event: "DELETE" }, (res) => (deleteResult = res)) + .on("broadcast", { event: "UPDATE" }, (res) => (updateResult = res)) + .subscribe(async (status) => { if (status == "SUBSCRIBED") { - await sleep(2); - - console.log( - await supabase.from(table).insert({ value: originalValue, id }) - ); - console.log( - await supabase - .from(table) - .update({ value: updatedValue }) - .eq("id", id) - ); - console.log(await supabase.from(table).delete().eq("id", id)); + await sleep(1); + + await supabase.from(table).insert({ value: originalValue, id }); + + await supabase + .from(table) + .update({ value: updatedValue }) + .eq("id", id); + + await supabase.from(table).delete().eq("id", id); } }); await sleep(5); diff --git a/test/realtime/messages_test.exs b/test/realtime/messages_test.exs new file mode 100644 index 000000000..08bee8e97 --- /dev/null +++ b/test/realtime/messages_test.exs @@ -0,0 +1,43 @@ +defmodule Realtime.MessagesTest do + use Realtime.DataCase, async: true + + alias Realtime.Api.Message + alias Realtime.Database + alias Realtime.Messages + alias Realtime.Repo + alias Realtime.Tenants.Migrations + + setup do + tenant = tenant_fixture() + [%{settings: settings} | _] = tenant.extensions + migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings} + Migrations.run_migrations(migrations) + + {:ok, conn} = Database.connect(tenant, "realtime_test", 1) + clean_table(conn, "realtime", "messages") + + %{conn: conn, tenant: tenant} + end + + test "delete_old_messages/1 deletes messages older than 72 hours", %{conn: conn, tenant: tenant} do + utc_now = NaiveDateTime.utc_now() + limit = NaiveDateTime.add(utc_now, -72, :hour) + + messages = + for days <- -5..0 do + inserted_at = NaiveDateTime.add(utc_now, days, :day) + message_fixture(tenant, %{inserted_at: inserted_at}) + end + + to_keep = + Enum.reject( + messages, + &(NaiveDateTime.compare(limit, &1.inserted_at) == :gt) + ) + + Messages.delete_old_messages(conn) + {:ok, current} = Repo.all(conn, from(m in Message), Message) + + assert current == to_keep + end +end diff --git a/test/realtime/tenants/scheduled_message_cleanup_test.exs b/test/realtime/tenants/scheduled_message_cleanup_test.exs new file mode 100644 index 000000000..719c7f3ff --- /dev/null +++ b/test/realtime/tenants/scheduled_message_cleanup_test.exs @@ -0,0 +1,134 @@ +defmodule Realtime.Tenants.ScheduledMessageCleanupTest do + # async: false due to using database process + use Realtime.DataCase, async: false + + import ExUnit.CaptureLog + + alias Realtime.Api.Message + alias Realtime.Api.Tenant + alias Realtime.Database + alias Realtime.Repo + alias Realtime.Tenants.Migrations + alias Realtime.Tenants.ScheduledMessageCleanup + + setup do + dev_tenant = Tenant |> Repo.all() |> hd() + timer = Application.get_env(:realtime, :schedule_clean) + platform = Application.get_env(:realtime, :platform) + + Application.put_env(:realtime, :schedule_clean, 200) + Application.put_env(:realtime, :platform, :aws) + Application.put_env(:realtime, :scheduled_randomize, false) + Application.put_env(:realtime, :max_children_scheduled_cleanup, 1) + Application.put_env(:realtime, :chunks, 2) + + tenants = + Enum.map( + [ + tenant_fixture(notify_private_alpha: true), + dev_tenant + ], + fn tenant -> + tenant = Repo.preload(tenant, [:extensions]) + [%{settings: settings} | _] = tenant.extensions + migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings} + Migrations.run_migrations(migrations) + {:ok, conn} = Database.connect(tenant, "realtime_test", 1) + clean_table(conn, "realtime", "messages") + tenant + end + ) + + on_exit(fn -> + Application.put_env(:realtime, :schedule_clean, timer) + Application.put_env(:realtime, :platform, platform) + end) + + %{tenants: tenants} + end + + describe "single node setup" do + test "cleans messages of multiple tenants", %{tenants: tenants} do + run_test(tenants) + end + end + + describe "multi node setup" do + setup do + region = Application.get_env(:realtime, :region) + Application.put_env(:realtime, :region, "us-east-1") + + {:ok, _} = :net_kernel.start([:"primary@127.0.0.1"]) + :syn.join(RegionNodes, "us-east-1", self(), node: node()) + + on_exit(fn -> + :net_kernel.stop() + :syn.leave(RegionNodes, "us-east-1", self()) + Application.put_env(:realtime, :region, region) + end) + end + + test "cleans messages of multiple tenants", %{tenants: tenants} do + run_test(tenants) + end + end + + test "logs error if fails to connect to tenant" do + extensions = [ + %{ + "type" => "postgres_cdc_rls", + "settings" => %{ + "db_host" => "localhost", + "db_name" => "postgres", + "db_user" => "supabase_admin", + "db_password" => "bad", + "db_port" => "5433", + "poll_interval" => 100, + "poll_max_changes" => 100, + "poll_max_record_bytes" => 1_048_576, + "region" => "us-east-1", + "ssl_enforced" => false + } + } + ] + + tenant_fixture(%{"extensions" => extensions, notify_private_alpha: true}) + + assert capture_log(fn -> + start_supervised!(ScheduledMessageCleanup) + Process.sleep(1000) + end) =~ "JanitorFailedToDeleteOldMessages" + end + + defp run_test(tenants) do + utc_now = NaiveDateTime.utc_now() + limit = NaiveDateTime.add(utc_now, -72, :hour) + + messages = + for days <- -5..0 do + inserted_at = NaiveDateTime.add(utc_now, days, :day) + Enum.map(tenants, &message_fixture(&1, %{inserted_at: inserted_at})) + end + |> List.flatten() + |> MapSet.new() + + to_keep = + messages + |> Enum.reject(&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt)) + |> MapSet.new() + + start_supervised!(ScheduledMessageCleanup) + Process.sleep(500) + + current = + Enum.map(tenants, fn tenant -> + {:ok, conn} = Database.connect(tenant, "realtime_test", 1) + {:ok, res} = Repo.all(conn, from(m in Message), Message) + res + end) + |> List.flatten() + |> MapSet.new() + + assert MapSet.difference(current, to_keep) |> MapSet.size() == 0 + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index c686482a5..0a51ea1ef 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,2 +1,2 @@ -ExUnit.start(exclude: [:failing]) +ExUnit.start(exclude: [:failing], max_cases: 1) Ecto.Adapters.SQL.Sandbox.mode(Realtime.Repo, :manual)