Skip to content

Commit

Permalink
fix: Delete realtime.messages entries older than 72h on connect (#1164)
Browse files Browse the repository at this point in the history
Starts a clean up task on each node to delete the messages in `realtime.message`. It checks:
* Which tenants need cleanup on a given region
* If the tenant should be cleaned by a given node
* Chunks the groups of tenants to avoid overloading Realtime.Repo and number of connections from a given node
* Starts connections for each tenant
* Runs query to delete messages older than 72h

We need two setups needed for tests:
* Self hosted isn't connected as a cluster nor has a region so we need a default behaviour for "regionless" setups
* For Node mode, we need to emulate the region and emulate the node connected

This impacts Cachex tests which is why we had to change to have max_cases set to 1 to avoid parallel tests from other modules to interfere

Finally, we also need to define if the application starts or not the scheduled tasks. This helps with tests but also helps if we need to stop this processes in production.
  • Loading branch information
filipecabaco authored Oct 29, 2024
1 parent ac23ca5 commit 6a1ae6b
Show file tree
Hide file tree
Showing 15 changed files with 496 additions and 88 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/staging_linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,7 @@ jobs:
mix dialyzer.build
- name: Run dialyzer
run: mix dialyzer
- name: Start epmd
run: epmd -daemon
- name: Run tests
run: mix test --trace
run: mix test --trace --max-cases 1
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000
| TENANT_MAX_EVENTS_PER_SECOND | string | The default value of maximum events per second that each tenant can support, used when creating a tenant for the first time. Defaults to '100'. |
| TENANT_MAX_JOINS_PER_SECOND | string | The default value of maximum channel joins per second that each tenant can support, used when creating a tenant for the first time. Defaults to '100'. |
| SEED_SELF_HOST | boolean | Seeds the system with default tenant |
| RUN_JANITOR_AFTER_IN_MS | number | Tells system when to start janitor tasks after boot |
| RUN_JANITOR | boolean | Do you want to janitor tasks to run |
| MAX_CHILDREN_JANITOR_CLEANUP | number | Maximum number of concurrent tasks working on janitor cleanup |
| JANITOR_CLEANUP_TASK_TIMEOUT | number | Timeout for each async task for janitor cleanup |

## WebSocket URL

Expand Down Expand Up @@ -218,6 +222,7 @@ This is the list of operational codes that can help you understand your deployme
| ErrorOnRpcCall | Error when calling another realtime node |
| ErrorExecutingTransaction | Error executing a database transaction in tenant database |
| SynInitializationError | Our framework to syncronize processes has failed to properly startup a connection to the database |
| JanitorFailedToDeleteOldMessages | Scheduled task for realtime.message cleanup was unable to run |
| UnknownError | An unknown error occurred |

## License
Expand Down
17 changes: 17 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,23 @@ config :realtime,
System.get_env("TENANT_MAX_JOINS_PER_SECOND", "100") |> String.to_integer(),
rpc_timeout: System.get_env("RPC_TIMEOUT", "30000") |> String.to_integer()

if config_env() == :test do
config :realtime, run_scheduled: false
else
config :realtime,
run_scheduled: System.get_env("RUN_SCHEDULED", "true") == "true",
scheduled_randomize: System.get_env("RUN_SCHEDULED", "true") == "true",
# defaults the runner to only start after 10 minutes
scheduled_start_after:
System.get_env("RUN_SCHEDULED_AFTER_IN_MS", "600000") |> String.to_integer()
end

config :realtime,
max_children_scheduled_cleanup:
System.get_env("MAX_CHILDREN_SCHEDULED_CLEANUP", "5") |> String.to_integer(),
scheduled_cleanup_task_timeout:
System.get_env("SCHEDULED_CLEANUP_TASK_TIMEOUT", "5000") |> String.to_integer()

if config_env() == :prod do
secret_key_base =
System.get_env("SECRET_KEY_BASE") ||
Expand Down
25 changes: 17 additions & 8 deletions lib/realtime/api/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,28 @@ defmodule Realtime.Api.Message do
@schema_prefix "realtime"

schema "messages" do
field :uuid, :string
field :topic, :string
field :extension, Ecto.Enum, values: [:broadcast, :presence]
field :payload, :map
field :event, :string
field :private, :boolean
field(:uuid, :string)
field(:topic, :string)
field(:extension, Ecto.Enum, values: [:broadcast, :presence])
field(:payload, :map)
field(:event, :string)
field(:private, :boolean)

timestamps()
end

def changeset(message, attrs) do
message
|> cast(attrs, [:topic, :extension, :payload, :event, :private, :uuid])
|> cast(attrs, [
:topic,
:extension,
:payload,
:event,
:private,
:inserted_at,
:updated_at,
:uuid
])
|> validate_required([:topic, :extension])
|> put_timestamp(:updated_at)
|> maybe_put_timestamp(:inserted_at)
Expand All @@ -31,7 +40,7 @@ defmodule Realtime.Api.Message do
end

defp maybe_put_timestamp(changeset, field) do
case Map.get(changeset.data, field, nil) do
case Map.get(changeset.changes, field) do
nil -> put_timestamp(changeset, field)
_ -> changeset
end
Expand Down
19 changes: 18 additions & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ defmodule Realtime.Application do
def start(_type, _args) do
primary_config = :logger.get_primary_config()

max_children_scheduled_cleanup =
Application.get_env(:realtime, :max_children_scheduled_cleanup)

scheduled_cleanup_task_timeout =
Application.get_env(:realtime, :scheduled_cleanup_task_timeout)

# add the region to logs
:ok =
:logger.set_primary_config(
Expand Down Expand Up @@ -66,6 +72,11 @@ defmodule Realtime.Application do
{Registry, keys: :duplicate, name: Realtime.Registry},
{Registry, keys: :unique, name: Realtime.Registry.Unique},
{Task.Supervisor, name: Realtime.TaskSupervisor},
{Task.Supervisor,
name: Realtime.Tenants.ScheduledMessageCleanup.TaskSupervisor,
max_children: max_children_scheduled_cleanup,
max_seconds: scheduled_cleanup_task_timeout,
max_restarts: 1},
{PartitionSupervisor,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
Expand All @@ -83,7 +94,7 @@ defmodule Realtime.Application do
name: Realtime.BroadcastChanges.Handler.DynamicSupervisor},
RealtimeWeb.Endpoint,
RealtimeWeb.Presence
] ++ extensions_supervisors()
] ++ extensions_supervisors() ++ scheduled_tasks()

children =
case Replica.replica() do
Expand Down Expand Up @@ -112,4 +123,10 @@ defmodule Realtime.Application do
acc
end)
end

defp scheduled_tasks() do
if Application.fetch_env!(:realtime, :run_scheduled),
do: [Realtime.Tenants.ScheduledMessageCleanup],
else: []
end
end
4 changes: 1 addition & 3 deletions lib/realtime/database.ex
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,7 @@ defmodule Realtime.Database do
username: user,
pool_size: pool,
queue_target: queue_target,
parameters: [
application_name: application_name
],
parameters: [application_name: application_name],
socket_options: [addrtype],
backoff_type: backoff_type,
configure: fn args ->
Expand Down
18 changes: 18 additions & 0 deletions lib/realtime/messages.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Realtime.Messages do
@moduledoc """
Handles `realtime.messages` table operations
"""
import Ecto.Query
alias Realtime.Repo
alias Realtime.Api.Message

@doc """
Deletes messages older than 72 hours for a given tenant connection
"""
@spec delete_old_messages(pid()) :: {:ok, any()} | {:error, any()}
def delete_old_messages(conn) do
limit = NaiveDateTime.utc_now() |> NaiveDateTime.add(-72, :hour)
query = from m in Message, where: m.inserted_at <= ^limit
Repo.del(conn, query)
end
end
117 changes: 73 additions & 44 deletions lib/realtime/nodes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,46 @@ defmodule Realtime.Nodes do
require Logger
alias Realtime.Api.Tenant

@mapping_realtime_region_to_tenant_region_aws %{
"ap-southeast-1" => [
"ap-east-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-south-1",
"ap-southeast-1"
],
"ap-southeast-2" => ["ap-southeast-2"],
"eu-west-2" => [
"eu-central-1",
"eu-central-2",
"eu-north-1",
"eu-west-1",
"eu-west-2",
"eu-west-3"
],
"us-east-1" => [
"ca-central-1",
"sa-east-1",
"us-east-1",
"us-east-2"
],
"us-west-1" => ["us-west-1", "us-west-2"]
}

@mapping_realtime_region_to_tenant_region_fly %{
"iad" => ["ca-central-1", "sa-east-1", "us-east-1"],
"lhr" => ["eu-central-1", "eu-west-1", "eu-west-2", "eu-west-3"],
"sea" => ["us-west-1"],
"syd" => [
"ap-east-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2"
]
}

@doc """
Gets the node to launch the Postgres connection on for a tenant.
"""
Expand Down Expand Up @@ -35,52 +75,24 @@ defmodule Realtime.Nodes do
region_mapping(platform, tenant_region)
end

defp region_mapping(:aws, tenant_region) do
case tenant_region do
"ap-east-1" -> "ap-southeast-1"
"ap-northeast-1" -> "ap-southeast-1"
"ap-northeast-2" -> "ap-southeast-1"
"ap-south-1" -> "ap-southeast-1"
"ap-southeast-1" -> "ap-southeast-1"
"ap-southeast-2" -> "ap-southeast-2"
"ca-central-1" -> "us-east-1"
"eu-central-1" -> "eu-west-2"
"eu-central-2" -> "eu-west-2"
"eu-north-1" -> "eu-west-2"
"eu-west-1" -> "eu-west-2"
"eu-west-2" -> "eu-west-2"
"eu-west-3" -> "eu-west-2"
"sa-east-1" -> "us-east-1"
"us-east-1" -> "us-east-1"
"us-east-2" -> "us-east-1"
"us-west-1" -> "us-west-1"
"us-west-2" -> "us-west-1"
_ -> nil
end
defp region_mapping(nil, tenant_region), do: tenant_region

defp region_mapping(platform, tenant_region) do
mappings =
case platform do
:aws -> @mapping_realtime_region_to_tenant_region_aws
:fly -> @mapping_realtime_region_to_tenant_region_fly
_ -> []
end

mappings
|> Enum.flat_map(fn {realtime_region, tenant_regions} ->
Enum.map(tenant_regions, fn tenant_region -> {tenant_region, realtime_region} end)
end)
|> Map.new()
|> Map.get(tenant_region)
end

defp region_mapping(:fly, tenant_region) do
case tenant_region do
"us-east-1" -> "iad"
"us-west-1" -> "sea"
"sa-east-1" -> "iad"
"ca-central-1" -> "iad"
"ap-southeast-1" -> "syd"
"ap-northeast-1" -> "syd"
"ap-northeast-2" -> "syd"
"ap-southeast-2" -> "syd"
"ap-east-1" -> "syd"
"ap-south-1" -> "syd"
"eu-west-1" -> "lhr"
"eu-west-2" -> "lhr"
"eu-west-3" -> "lhr"
"eu-central-1" -> "lhr"
_ -> nil
end
end

defp region_mapping(_, tenant_region), do: tenant_region

@doc """
Lists the nodes in a region. Sorts by node name in case the list order
is unstable.
Expand Down Expand Up @@ -161,4 +173,21 @@ defmodule Realtime.Nodes do
host
end
end

@doc """
Fetches the tenant regions for a given realtime reagion
"""
@spec region_to_tenant_regions(String.t()) :: list() | nil
def region_to_tenant_regions(region) do
platform = Application.get_env(:realtime, :platform)

mappings =
case platform do
:aws -> @mapping_realtime_region_to_tenant_region_aws
:fly -> @mapping_realtime_region_to_tenant_region_fly
_ -> %{}
end

Map.get(mappings, region)
end
end
1 change: 0 additions & 1 deletion lib/realtime/tenants/connect/check_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule Realtime.Tenants.Connect.CheckConnection do

@application_name "realtime_connect"
@behaviour Realtime.Tenants.Connect.Piper

@impl true
def run(acc) do
%{tenant_id: tenant_id} = acc
Expand Down
Loading

0 comments on commit 6a1ae6b

Please sign in to comment.