Skip to content

Commit

Permalink
fix: UUID auto generate; Error handling & e2e tests (#1172)
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco authored Oct 19, 2024
1 parent 60a0999 commit ded1651
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 23 deletions.
4 changes: 2 additions & 2 deletions lib/realtime/api/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Realtime.Api.Message do
@schema_prefix "realtime"

schema "messages" do
field :uuid, :string, default: Ecto.UUID.generate()
field :uuid, :string
field :topic, :string
field :extension, Ecto.Enum, values: [:broadcast, :presence]
field :payload, :map
Expand All @@ -20,7 +20,7 @@ defmodule Realtime.Api.Message do

def changeset(message, attrs) do
message
|> cast(attrs, [:topic, :extension, :payload, :event, :private])
|> cast(attrs, [:topic, :extension, :payload, :event, :private, :uuid])
|> validate_required([:topic, :extension])
|> put_timestamp(:updated_at)
|> maybe_put_timestamp(:inserted_at)
Expand Down
16 changes: 16 additions & 0 deletions lib/realtime/broadcast_changes/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ defmodule Realtime.BroadcastChanges.Handler do
relation = %{name: name, columns: columns, namespace: namespace}
relations = Map.put(relations, id, relation)
{:noreply, %{state | relations: relations}}
rescue
e ->
log_error("UnableToBroadcastChanges", e)
{:noreply, state}
catch
e ->
log_error("UnableToBroadcastChanges", e)
{:noreply, state}
end

def handle_info(%Decoder.Messages.Insert{} = msg, state) do
Expand Down Expand Up @@ -302,6 +310,14 @@ defmodule Realtime.BroadcastChanges.Handler do
log_error("UnknownBroadcastChangesRelation", "Relation ID not found: #{relation_id}")
{:noreply, state}
end
rescue
e ->
log_error("UnableToBroadcastChanges", e)
{:noreply, state}
catch
e ->
log_error("UnableToBroadcastChanges", e)
{:noreply, state}
end

def handle_info(:shutdown, state) do
Expand Down
6 changes: 4 additions & 2 deletions lib/realtime/tenants/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ defmodule Realtime.Tenants.Migrations do
LoggedMessagesTable,
FilterDeletePostgresChanges,
AddPayloadToMessages,
ChangeMessagesIdType
ChangeMessagesIdType,
UuidAutoGeneration
}

@migrations [
Expand Down Expand Up @@ -113,7 +114,8 @@ defmodule Realtime.Tenants.Migrations do
{20_240_805_133_720, LoggedMessagesTable},
{20_240_827_160_934, FilterDeletePostgresChanges},
{20_240_919_163_303, AddPayloadToMessages},
{20_240_919_163_305, ChangeMessagesIdType}
{20_240_919_163_305, ChangeMessagesIdType},
{20_241_019_105_805, UuidAutoGeneration}
]
defstruct [:tenant_external_id, :settings]
@spec run_migrations(map()) :: :ok | {:error, any()}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule Realtime.Tenants.Migrations.UuidAutoGeneration do
@moduledoc false
use Ecto.Migration

def change do
alter table(:messages) do
modify :uuid, :uuid, null: false, default: fragment("gen_random_uuid()")
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.33.6",
version: "2.33.7",
elixir: "~> 1.16.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
9 changes: 8 additions & 1 deletion test/e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ CREATE TABLE public.authorization (
);

CREATE TABLE public.broadcast_changes (
id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
id text PRIMARY KEY,
value text NOT NULL
);

ALTER TABLE public.pg_changes ENABLE ROW LEVEL SECURITY;

ALTER TABLE public.authorization ENABLE ROW LEVEL SECURITY;

ALTER TABLE public.broadcast_changes ENABLE ROW LEVEL SECURITY;

ALTER PUBLICATION supabase_realtime
ADD TABLE public.pg_changes;

Expand All @@ -56,6 +58,11 @@ CREATE POLICY "allow authenticated users all access" ON "public"."pg_changes" AS
FOR ALL TO authenticated
USING (TRUE);

CREATE POLICY "authenticated have full access to read on broadcast_changes" ON "public"."broadcast_changes" AS PERMISSIVE
FOR ALL TO authenticated
USING (TRUE);


CREATE OR REPLACE FUNCTION broadcast_changes_for_table_trigger ()
RETURNS TRIGGER
AS $$
Expand Down
55 changes: 38 additions & 17 deletions test/e2e/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ describe("broadcast extension", () => {
describe("postgres changes extension", () => {
it("user is able to receive INSERT only events from a subscribed table with filter applied", async () => {
let supabase = await createClient(url, token, { realtime });
let accessToken = await signInUser(supabase, "[email protected]", "test_test");
let accessToken = await signInUser(
supabase,
"[email protected]",
"test_test"
);
await supabase.realtime.setAuth(accessToken);

let result: Array<any> = [];
Expand Down Expand Up @@ -155,7 +159,11 @@ describe("postgres changes extension", () => {

it("user is able to receive UPDATE only events from a subscribed table with filter applied", async () => {
let supabase = await createClient(url, token, { realtime });
let accessToken = await signInUser(supabase, "[email protected]", "test_test");
let accessToken = await signInUser(
supabase,
"[email protected]",
"test_test"
);
await supabase.realtime.setAuth(accessToken);

let result: Array<any> = [];
Expand Down Expand Up @@ -194,7 +202,11 @@ describe("postgres changes extension", () => {

it("user is able to receive DELETE only events from a subscribed table with filter applied", async () => {
let supabase = await createClient(url, token, { realtime });
let accessToken = await signInUser(supabase, "[email protected]", "test_test");
let accessToken = await signInUser(
supabase,
"[email protected]",
"test_test"
);
await supabase.realtime.setAuth(accessToken);

let result: Array<any> = [];
Expand Down Expand Up @@ -258,7 +270,11 @@ describe("authorization check", () => {

it("user using private channel can connect if they have enough permissions", async () => {
let supabase = await createClient(url, token, { realtime });
let accessToken = await signInUser(supabase, "[email protected]", "test_test");
let accessToken = await signInUser(
supabase,
"[email protected]",
"test_test"
);
await supabase.realtime.setAuth(accessToken);

const channel = supabase
Expand All @@ -275,11 +291,16 @@ describe("authorization check", () => {

describe("broadcast changes", () => {
const table = "broadcast_changes";
const id = 1;
const id = crypto.randomUUID();

it("authenticated user receives insert broadcast change from a specific topic based on id", async () => {
let supabase = await createClient(url, token, { realtime });
let accessToken = await signInUser(supabase, "[email protected]", "test_test");
let accessToken = await signInUser(
supabase,
"[email protected]",
"test_test"
);
console.log(accessToken);
await supabase.realtime.setAuth(accessToken);

let insertResult: any, updateResult: any, deleteResult: any;
Expand All @@ -288,38 +309,38 @@ describe("broadcast changes", () => {
.on("broadcast", { event: "INSERT" }, (res) => (insertResult = res))
.on("broadcast", { event: "DELETE" }, (res) => (deleteResult = res))
.on("broadcast", { event: "UPDATE" }, (res) => (updateResult = res))
.subscribe();
await sleep(2);
.subscribe((status, err) => console.log({ status, err }));
await sleep(1);
const originalValue = crypto.randomUUID();
const updatedValue = crypto.randomUUID();

await supabase.from(table).insert({ value: originalValue, id: 1 });
await supabase.from(table).insert({ value: originalValue, id });
await supabase.from(table).update({ value: updatedValue }).eq("id", id);
await supabase.from(table).delete().eq("id", id);

await supabase.auth.signOut();
await stopClient(supabase, [channel]);

assertEquals(insertResult.payload.record.id, 1);
await sleep(1);
assertEquals(insertResult.payload.record.id, id);
assertEquals(insertResult.payload.record.value, originalValue);
assertEquals(insertResult.payload.old_record, null);
assertEquals(insertResult.payload.operation, "INSERT");
assertEquals(insertResult.payload.schema, "public");
assertEquals(insertResult.payload.table, "broadcast_changes");

assertEquals(updateResult.payload.record.id, 1);
assertEquals(updateResult.payload.record.id, id);
assertEquals(updateResult.payload.record.value, updatedValue);
assertEquals(updateResult.payload.old_record.id, 1);
assertEquals(updateResult.payload.old_record.id, id);
assertEquals(updateResult.payload.old_record.value, originalValue);
assertEquals(updateResult.payload.operation, "UPDATE");
assertEquals(updateResult.payload.schema, "public");
assertEquals(updateResult.payload.table, "broadcast_changes");

assertEquals(deleteResult.payload.record, null);
assertEquals(deleteResult.payload.old_record.id, 1);
assertEquals(deleteResult.payload.old_record.id, id);
assertEquals(deleteResult.payload.old_record.value, updatedValue);
assertEquals(deleteResult.payload.operation, "DELETE");
assertEquals(deleteResult.payload.schema, "public");
assertEquals(deleteResult.payload.table, "broadcast_changes");

await supabase.auth.signOut();
await stopClient(supabase, [channel]);
});
});

0 comments on commit ded1651

Please sign in to comment.