Skip to content

Commit

Permalink
Accept filters map instead of list
Browse files Browse the repository at this point in the history
  • Loading branch information
msfstef committed Dec 16, 2024
1 parent 64729a4 commit 6e1aeca
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 80 deletions.
76 changes: 41 additions & 35 deletions packages/sync-service/lib/electric/postgres/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ defmodule Electric.Postgres.Configuration do
"""
@spec configure_tables_for_replication!(
Postgrex.conn(),
[RelationFilter.t()],
(-> String.t()),
filters(),
String.t(),
float()
) ::
{:ok, [:ok]}
Expand Down Expand Up @@ -122,7 +122,11 @@ defmodule Electric.Postgres.Configuration do
relation_filters = filter_for_existing_relations(conn, relation_filters)

# Update the entire publication with the new filters
Postgrex.query!(conn, make_alter_publication_query(publication_name, relation_filters), [])
Postgrex.query!(
conn,
make_alter_publication_query(publication_name, relation_filters),
[]
)

# `ALTER TABLE` should be after the publication altering, because it takes out an exclusive lock over this table,
# but the publication altering takes out a shared lock on all mentioned tables, so a concurrent transaction will
Expand All @@ -134,7 +138,7 @@ defmodule Electric.Postgres.Configuration do
end

defp set_replica_identity!(conn, relation_filters) do
for %RelationFilter{relation: relation} <- relation_filters,
for %RelationFilter{relation: relation} <- Map.values(relation_filters),
table = Utils.relation_to_sql(relation) do
%Postgrex.Result{rows: [[correct_identity?]]} =
Postgrex.query!(
Expand Down Expand Up @@ -163,30 +167,35 @@ defmodule Electric.Postgres.Configuration do

# Makes an SQL query that alters the given publication whith the given tables and filters.
@spec make_alter_publication_query(String.t(), filters()) :: String.t()
defp make_alter_publication_query(publication_name, []),
do: "DO $$
DECLARE
tables TEXT;
BEGIN
SELECT string_agg(format('%I.%I', schemaname, tablename), ', ')
INTO tables
FROM pg_publication_tables
WHERE pubname = '#{publication_name}' ;
IF tables IS NOT NULL THEN
EXECUTE format('ALTER PUBLICATION #{Utils.quote_name(publication_name)} DROP TABLE %s', tables);
END IF;
END $$;"

defp make_alter_publication_query(publication_name, filters) do
base_sql = "ALTER PUBLICATION #{Utils.quote_name(publication_name)} SET TABLE "

tables =
filters
|> Enum.map(&make_table_clause/1)
|> Enum.join(", ")

base_sql <> tables
case Map.values(filters) do
[] ->
"
DO $$
DECLARE
tables TEXT;
BEGIN
SELECT string_agg(format('%I.%I', schemaname, tablename), ', ')
INTO tables
FROM pg_publication_tables
WHERE pubname = '#{publication_name}' ;
IF tables IS NOT NULL THEN
EXECUTE format('ALTER PUBLICATION #{Utils.quote_name(publication_name)} DROP TABLE %s', tables);
END IF;
END $$;
"

filters ->
base_sql = "ALTER PUBLICATION #{Utils.quote_name(publication_name)} SET TABLE "

tables =
filters
|> Enum.map(&make_table_clause/1)
|> Enum.join(", ")

base_sql <> tables
end
end

@spec filter_for_existing_relations(Postgrex.conn(), filters()) :: filters()
Expand All @@ -204,21 +213,18 @@ defmodule Electric.Postgres.Configuration do
WHERE pn.nspname = ir.schemaname AND pc.relkind = 'r';
"

relation_filter_map =
filters |> Enum.map(&{&1.relation, &1}) |> Map.new()

relations = Map.keys(relation_filter_map)
relations = Map.keys(filters)

Postgrex.query!(conn, query, [
Enum.map(relations, &elem(&1, 0)),
Enum.map(relations, &elem(&1, 1))
])
|> Map.fetch!(:rows)
|> Enum.map(&List.to_tuple/1)
|> Enum.reduce([], fn rel, filters ->
case Map.get(relation_filter_map, rel) do
nil -> filters
filter -> [filter | filters]
|> Enum.reduce(filters, fn rel, filters ->
case Map.get(filters, rel) do
nil -> Map.delete(filters, rel)
_ -> filters
end
end)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ defmodule Electric.Replication.PublicationManager do
) do
configure_tables_for_replication_fn.(
db_pool,
Map.values(relation_filters),
relation_filters,
pg_version,
publication_name
)
Expand Down
88 changes: 45 additions & 43 deletions packages/sync-service/test/electric/postgres/configuration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ defmodule Electric.Postgres.ConfigurationTest do

Configuration.configure_tables_for_replication!(
conn,
[
%RelationFilter{
%{
{"public", "items"} => %RelationFilter{
relation: {"public", "items"},
where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}]
}
],
},
pg_version,
publication
)
Expand All @@ -86,12 +86,12 @@ defmodule Electric.Postgres.ConfigurationTest do
assert capture_log(fn ->
Configuration.configure_tables_for_replication!(
conn,
[
%RelationFilter{
%{
{"public", "items"} => %RelationFilter{
relation: {"public", "items"},
where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}]
}
],
},
pg_version,
publication
)
Expand All @@ -102,12 +102,12 @@ defmodule Electric.Postgres.ConfigurationTest do
refute capture_log(fn ->
Configuration.configure_tables_for_replication!(
conn,
[
%RelationFilter{
%{
{"public", "items"} => %RelationFilter{
relation: {"public", "items"},
where_clauses: [%Eval.Expr{query: "(value ILIKE 'no%')"}]
}
],
},
pg_version,
publication
)
Expand All @@ -125,16 +125,16 @@ defmodule Electric.Postgres.ConfigurationTest do

Configuration.configure_tables_for_replication!(
conn,
[
%RelationFilter{
%{
{"public", "items"} => %RelationFilter{
relation: {"public", "items"},
where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}]
},
%RelationFilter{
{"public", "other_table"} => %RelationFilter{
relation: {"public", "other_table"},
where_clauses: [%Eval.Expr{query: "(value ILIKE 'no%')"}]
}
],
},
pg_version,
publication
)
Expand Down Expand Up @@ -163,16 +163,16 @@ defmodule Electric.Postgres.ConfigurationTest do

Configuration.configure_tables_for_replication!(
conn,
[
%RelationFilter{
%{
{"public", "items"} => %RelationFilter{
relation: {"public", "items"},
where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}]
},
%RelationFilter{
{"public", "other_table"} => %RelationFilter{
relation: {"public", "other_table"},
where_clauses: [%Eval.Expr{query: "(value ILIKE 'no%')"}]
}
],
},
pg_version,
publication
)
Expand All @@ -191,16 +191,16 @@ defmodule Electric.Postgres.ConfigurationTest do

Configuration.configure_tables_for_replication!(
conn,
[
%RelationFilter{
%{
{"public", "items"} => %RelationFilter{
relation: {"public", "items"},
where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}]
},
%RelationFilter{
{"public", "other_table"} => %RelationFilter{
relation: {"public", "other_table"},
where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}]
}
],
},
pg_version,
publication
)
Expand All @@ -219,12 +219,12 @@ defmodule Electric.Postgres.ConfigurationTest do
%{pool: conn, publication_name: publication, pg_version: pg_version} do
Configuration.configure_tables_for_replication!(
conn,
[
%RelationFilter{
%{
{"public", "items"} => %RelationFilter{
relation: {"public", "items"},
where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}]
}
],
},
pg_version,
publication
)
Expand All @@ -242,13 +242,13 @@ defmodule Electric.Postgres.ConfigurationTest do
# Configure `items` table again but with a different where clause
Configuration.configure_tables_for_replication!(
conn,
[
%RelationFilter{
%{
{"public", "items"} => %RelationFilter{
relation: {"public", "items"},
where_clauses: [%Eval.Expr{query: "(value ILIKE 'no%')"}]
},
%RelationFilter{relation: {"public", "other_table"}}
],
{"public", "other_table"} => %RelationFilter{relation: {"public", "other_table"}}
},
pg_version,
publication
)
Expand All @@ -269,10 +269,10 @@ defmodule Electric.Postgres.ConfigurationTest do
# the resulting publication should no longer have a filter for that table
Configuration.configure_tables_for_replication!(
conn,
[
%RelationFilter{relation: {"public", "items"}},
%RelationFilter{relation: {"public", "other_table"}}
],
%{
{"public", "items"} => %RelationFilter{relation: {"public", "items"}},
{"public", "other_table"} => %RelationFilter{relation: {"public", "other_table"}}
},
pg_version,
publication
)
Expand All @@ -291,7 +291,9 @@ defmodule Electric.Postgres.ConfigurationTest do
assert_raise Postgrex.Error, ~r/undefined_object/, fn ->
Configuration.configure_tables_for_replication!(
conn,
[%RelationFilter{relation: {"public", "items"}}],
%{
{"public", "items"} => %RelationFilter{relation: {"public", "items"}}
},
pg_version,
"nonexistent"
)
Expand All @@ -306,38 +308,38 @@ defmodule Electric.Postgres.ConfigurationTest do
# Create the publication first
Configuration.configure_tables_for_replication!(
conn,
[
%RelationFilter{
%{
{"public", "items"} => %RelationFilter{
relation: {"public", "items"},
where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}]
},
%RelationFilter{
{"public", "other_table"} => %RelationFilter{
relation: {"public", "other_table"},
where_clauses: [%Eval.Expr{query: "(value ILIKE '1%')"}]
},
%RelationFilter{
{"public", "other_other_table"} => %RelationFilter{
relation: {"public", "other_other_table"},
where_clauses: [%Eval.Expr{query: "(value ILIKE '1%')"}]
}
],
},
pg_version,
publication
)

new_filters = [
%RelationFilter{
new_filters = %{
{"public", "items"} => %RelationFilter{
relation: {"public", "items"},
where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}]
},
%RelationFilter{
{"public", "other_table"} => %RelationFilter{
relation: {"public", "other_table"},
where_clauses: [%Eval.Expr{query: "(value ILIKE '2%')"}]
},
%RelationFilter{
{"public", "other_other_table"} => %RelationFilter{
relation: {"public", "other_other_table"},
where_clauses: [%Eval.Expr{query: "(value ILIKE '2%')"}]
}
]
}

task1 =
Task.async(fn ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ defmodule Electric.Replication.PublicationManagerTest do

setup ctx do
test_pid = self()
configure_tables_fn = fn _, filters, _, _ -> send(test_pid, {:filters, filters}) end

configure_tables_fn = fn _, filters, _, _ ->
send(test_pid, {:filters, Map.values(filters)})
end

%{publication_manager: {_, publication_manager_opts}} =
with_publication_manager(%{
Expand Down

0 comments on commit 6e1aeca

Please sign in to comment.