Skip to content

Commit

Permalink
fix: always call pg_logical_slot_get_changes when polling to flush slot
Browse files Browse the repository at this point in the history
  • Loading branch information
w3b6x9 committed Dec 21, 2021
1 parent 51582c4 commit c49ceac
Showing 1 changed file with 31 additions and 31 deletions.
62 changes: 31 additions & 31 deletions server/lib/realtime/rls/replications.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,52 +54,52 @@ defmodule Realtime.RLS.Replications do
case when bool_or(pubupdate) then 'update' else null end,
case when bool_or(pubdelete) then 'delete' else null end
) as w2j_actions,
string_agg(realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass), ',') w2j_add_tables
coalesce(
string_agg(
realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass),
','
) filter (where ppt.tablename is not null),
''
) w2j_add_tables
from
pg_publication pp
join pg_publication_tables ppt
left join pg_publication_tables ppt
on pp.pubname = ppt.pubname
where
pp.pubname = $1
group by
pp.pubname
limit 1
),
w2j as (
select
x.*, pub.w2j_add_tables
from
pub,
pg_logical_slot_get_changes(
$2, null, null,
'include-pk', '1',
'include-transaction', 'false',
'include-timestamp', 'true',
'write-in-chunks', 'true',
'format-version', '2',
'actions', pub.w2j_actions,
'add-tables', pub.w2j_add_tables
) x
)
select
xyz.wal,
xyz.is_rls_enabled,
xyz.subscription_ids,
xyz.errors
from
pub,
lateral (
select
*
from
pg_logical_slot_get_changes(
$2, null, null,
'include-pk', '1',
'include-transaction', 'false',
'include-timestamp', 'true',
'write-in-chunks', 'true',
'format-version', '2',
'actions', coalesce(pub.w2j_actions, ''),
'add-tables', pub.w2j_add_tables
)
) w2j,
lateral (
select
x.wal,
x.is_rls_enabled,
x.subscription_ids,
x.errors
from
realtime.apply_rls(
wal := w2j.data::jsonb,
max_record_bytes := $3
) x(wal, is_rls_enabled, subscription_ids, errors)
) xyz
where coalesce(pub.w2j_add_tables, '') <> ''
w2j,
realtime.apply_rls(
wal := w2j.data::jsonb,
max_record_bytes := $3
) xyz(wal, is_rls_enabled, subscription_ids, errors)
where
w2j.w2j_add_tables <> ''
and xyz.subscription_ids[1] is not null",
[
publication,
Expand Down

0 comments on commit c49ceac

Please sign in to comment.