Skip to content

Commit

Permalink
Merge pull request #62 from jfinzel/bugfix_multi_sub_single_db
Browse files Browse the repository at this point in the history
Ignore duplicates in queue table to support multiple subscriptions fr…

* https://github.com/enova/pgl_ddl_deploy:
  Ignore duplicates in queue table to support multiple subscriptions from same provider db
  • Loading branch information
Jerry Sievers committed Feb 19, 2021
2 parents 60d3894 + 03adc8f commit 5083d16
Show file tree
Hide file tree
Showing 20 changed files with 3,647 additions and 64 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ DATA = pgl_ddl_deploy--1.0.sql pgl_ddl_deploy--1.0--1.1.sql \
pgl_ddl_deploy--1.5.sql pgl_ddl_deploy--1.5--1.6.sql \
pgl_ddl_deploy--1.6.sql pgl_ddl_deploy--1.6--1.7.sql \
pgl_ddl_deploy--1.7.sql pgl_ddl_deploy--1.7--2.0.sql \
pgl_ddl_deploy--2.0.sql
pgl_ddl_deploy--2.0.sql pgl_ddl_deploy--2.0--2.1.sql \
pgl_ddl_deploy--2.1.sql
MODULES = pgl_ddl_deploy ddl_deparse

REGRESS := 01_create_ext 02_setup 03_add_configs 04_deploy 04_deploy_update \
Expand Down
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ Transparent DDL replication for Postgres 9.5+ for both pglogical and native logi
- [DDL involving multiple tables](#multi_tables)
- [Unsupported Commands](#unsupported)
- [Multi-Statement Client SQL Limitations](#multi_statement)
- [Native Logical Supported Configurations](#native_support)

[Resolving DDL Replication Issues](#resolve)
- [Resolving Failed DDL on Subscribers](#resolve_failed)
- [Resolving Unhandled DDL](#resolve_unhandled)
- [Disable DDL Replication on Subscriber](#disable_ddl)

[For Developers](#devs)
- [Help Wanted Features](#help_wanted)
Expand Down Expand Up @@ -716,6 +718,14 @@ The `unhandled` table and `WARNING` logs are designed to be leveraged with
monitoring to create alerting around when manual intervention is required for
DDL changes.

## <a name="native_support"></a>Native Logical Supported Configurations

The only known configuration limitation for native logical DDL replication is that
only a publication that includes replicating inserts can support DDL replication. This relates
to how the DDL queueing mechanism works. The default `CREATE PUBLICATION` in postgres
includes publishing inserts. It can also be configured specifically with `WITH (publish = 'insert')`.
For more details see https://www.postgresql.org/docs/current/sql-createpublication.html.

# <a name="resolve"></a>Resolving DDL Replication Issues

## <a name="resolve_failed"></a>Resolving Failed DDL on Subscribers
Expand Down Expand Up @@ -819,6 +829,26 @@ through `replicate_ddl_command`. But such a feature may cause additional and
unnecessary administration overhead, since it is likely the strictness of
replication will cause the system to break when it should.

## <a name="disable_ddl"></a>Disable DDL Replication on Subscriber

The following applies to native logical replication only.

As a last resort, you might find some unexpected problem where you want to disable DDL
replication on the subscriber because you are in an error loop. This should be done
very cautiously and as a last resort.

Disable the trigger on the DDL queue table to ignore all DDL replication on subscriber:
```
ALTER TABLE pgl_ddl_deploy.queue DISABLE TRIGGER execute_queued_ddl;
```

Do what you need to do for manual fixes. If you suspect a bug, please report it. When
you are finished, you MUST ENABLE THE TRIGGER IN REPLICATION MODE ONLY:
```
ALTER TABLE pgl_ddl_deploy.queue ENABLE REPLICA TRIGGER execute_queued_ddl;
```


# <a name="devs"></a>For Developers

## <a name="help_wanted"></a>Help Wanted Features
Expand Down
6 changes: 6 additions & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
pgl-ddl-deploy (2.1.0-1) unstable; urgency=medium

* Fix duplicate issue with multiple subscriptions

-- Jeremy Finzel <[email protected]> Fri, 19 Feb 2021 11:21:59 -0600

pgl-ddl-deploy (2.0.0-2) unstable; urgency=medium

[ Jeremy Finzel ]
Expand Down
2 changes: 1 addition & 1 deletion expected/01_create_ext.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Allow running regression suite with upgrade paths
\set v `echo ${FROMVERSION:-2.0}`
\set v `echo ${FROMVERSION:-2.1}`
SET client_min_messages = warning;
CREATE EXTENSION pglogical;
CREATE EXTENSION pgl_ddl_deploy VERSION :'v';
12 changes: 12 additions & 0 deletions expected/02_setup.out
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,15 @@ END IF;
END;
$BODY$
LANGUAGE plpgsql;
CREATE FUNCTION verify_count(ct int, expected int) RETURNS BOOLEAN AS $BODY$
BEGIN

RAISE LOG 'ct: %', ct;
IF ct != expected THEN
RAISE EXCEPTION 'Count % does not match expected count of %', ct, expected;
END IF;

RETURN TRUE;

END$BODY$
LANGUAGE plpgsql;
2 changes: 1 addition & 1 deletion expected/29_create_ext.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Allow running regression suite with upgrade paths
\set v `echo ${FROMVERSION:-2.0}`
\set v `echo ${FROMVERSION:-2.1}`
SET client_min_messages = warning;
CREATE TEMP TABLE v AS
SELECT :'v'::TEXT AS num;
Expand Down
12 changes: 12 additions & 0 deletions expected/30_setup.out
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,15 @@ END IF;
END;
$BODY$
LANGUAGE plpgsql;
CREATE FUNCTION verify_count(ct int, expected int) RETURNS BOOLEAN AS $BODY$
BEGIN

RAISE LOG 'ct: %', ct;
IF ct != expected THEN
RAISE EXCEPTION 'Count % does not match expected count of %', ct, expected;
END IF;

RETURN TRUE;

END$BODY$
LANGUAGE plpgsql;
4 changes: 1 addition & 3 deletions expected/44_multi_set_tags.out
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ BEGIN
IF current_setting('server_version_num')::INT >= 100000 THEN
SELECT COUNT(1) INTO v_ct FROM pg_publication_tables WHERE schemaname = 'pgl_ddl_deploy' AND tablename = 'queue';
RAISE LOG 'v_ct: %', v_ct;
IF v_ct != 8 THEN
RAISE EXCEPTION 'Count does not match expected: v_ct: %', v_ct;
END IF;
PERFORM verify_count(v_ct, 8);
END IF;

END$$;
40 changes: 24 additions & 16 deletions expected/57_native_features.out
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,42 @@ INSERT INTO pgl_ddl_deploy.queue (queued_at,role,pubnames,message_type,message)
VALUES (now(),current_role,'{mock}'::TEXT[],pgl_ddl_deploy.queue_ddl_message_type(),'ALTER TABLE nativerox ADD COLUMN bar text;');
INSERT INTO pgl_ddl_deploy.queue (queued_at,role,pubnames,message_type,message)
VALUES (now(),current_role,'{mock}'::TEXT[],pgl_ddl_deploy.queue_ddl_message_type(),$$SELECT pgl_ddl_deploy.notify_subscription_refresh('mock', true);$$);
CREATE FUNCTION verify_count(ct int, expected int) RETURNS BOOLEAN AS $BODY$
BEGIN

RAISE LOG 'ct: %', ct;
IF ct != expected THEN
RAISE EXCEPTION 'Count % does not match expected count of %', ct, expected;
END IF;

RETURN TRUE;

END$BODY$
LANGUAGE plpgsql;
DO $$
DECLARE v_ct INT;
BEGIN

IF current_setting('server_version_num')::INT >= 100000 THEN
SELECT COUNT(1) INTO v_ct FROM information_schema.columns WHERE table_name = 'nativerox';
RAISE LOG 'v_ct: %', v_ct;
IF v_ct != 2 THEN
RAISE EXCEPTION 'Count does not match expected: v_ct: %', v_ct;
END IF;
PERFORM verify_count(v_ct, 2);
SELECT COUNT(1) INTO v_ct FROM pgl_ddl_deploy.subscriber_logs;
RAISE LOG 'v_ct: %', v_ct;
IF v_ct != 1 THEN
RAISE EXCEPTION 'Count does not match expected: v_ct: %', v_ct;
END IF;
PERFORM verify_count(v_ct, 1);
PERFORM pgl_ddl_deploy.retry_all_subscriber_logs();
SELECT (SELECT COUNT(1) FROM pgl_ddl_deploy.subscriber_logs WHERE NOT succeeded) +
(SELECT COUNT(1) FROM pgl_ddl_deploy.subscriber_logs WHERE error_message ~* 'No subscription to publication mock exists') INTO v_ct;
RAISE LOG 'v_ct: %', v_ct;
IF v_ct != 3 THEN
RAISE EXCEPTION 'Count does not match expected: v_ct: %', v_ct;
END IF;
PERFORM verify_count(v_ct, 3);
-- test for duplicate avoidance with multiple subscriptions
SELECT COUNT(1) INTO v_ct FROM pgl_ddl_deploy.queue;
PERFORM verify_count(v_ct, 3);
SET session_replication_role TO replica;
INSERT INTO pgl_ddl_deploy.queue SELECT * FROM pgl_ddl_deploy.queue;
SELECT COUNT(1) INTO v_ct FROM pgl_ddl_deploy.queue;
PERFORM verify_count(v_ct, 3);
RESET session_replication_role;
ELSE
SELECT COUNT(1) INTO v_ct FROM pgl_ddl_deploy.subscriber_logs;
RAISE LOG 'v_ct: %', v_ct;
IF v_ct != 0 THEN
RAISE EXCEPTION 'Count does not match expected: v_ct: %', v_ct;
END IF;
PERFORM verify_count(v_ct, 0);
END IF;

END$$;
8 changes: 8 additions & 0 deletions functions/execute_queued_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ matching pubnames with pg_subscription.subpublications
If a row arrives here (the subscriber), it must mean that it was propagated
***/

-- This handles potential duplicates with multiple subscriptions to same publisher db.
IF EXISTS (
SELECT NEW.*
INTERSECT
SELECT * FROM pgl_ddl_deploy.queue) THEN
RETURN NULL;
END IF;

IF NEW.message_type = pgl_ddl_deploy.queue_ddl_message_type() AND
(pgl_ddl_deploy.override() OR ((SELECT COUNT(1) FROM pg_subscription s
WHERE subpublications && NEW.pubnames) > 0)) THEN
Expand Down
75 changes: 75 additions & 0 deletions pgl_ddl_deploy--2.0--2.1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/* pgl_ddl_deploy--2.0--2.1.sql */

-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pgl_ddl_deploy" to load this file. \quit

CREATE OR REPLACE FUNCTION pgl_ddl_deploy.execute_queued_ddl()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN

/***
Native logical replication does not support row filtering, so as a result,
we need to do processing downstream to ensure we only process rows we care about.
For example, if we propagate some DDL to system 1 and some other to system 2,
all rows will still come through this trigger. We filter out rows based on
matching pubnames with pg_subscription.subpublications
If a row arrives here (the subscriber), it must mean that it was propagated
***/

-- This handles potential duplicates with multiple subscriptions to same publisher db.
IF EXISTS (
SELECT NEW.*
INTERSECT
SELECT * FROM pgl_ddl_deploy.queue) THEN
RETURN NULL;
END IF;

IF NEW.message_type = pgl_ddl_deploy.queue_ddl_message_type() AND
(pgl_ddl_deploy.override() OR ((SELECT COUNT(1) FROM pg_subscription s
WHERE subpublications && NEW.pubnames) > 0)) THEN

-- See https://www.postgresql.org/message-id/CAMa1XUh7ZVnBzORqjJKYOv4_pDSDUCvELRbkF0VtW7pvDW9rZw@mail.gmail.com
IF NEW.message ~* 'pgl_ddl_deploy.notify_subscription_refresh' THEN
INSERT INTO pgl_ddl_deploy.subscriber_logs
(set_name,
provider_pid,
provider_node_name,
provider_set_config_id,
executed_as_role,
subscriber_pid,
executed_at,
ddl_sql,
full_ddl_sql,
succeeded,
error_message)
VALUES
(NEW.pubnames[1],
NULL,
NULL,
NULL,
current_role,
pg_backend_pid(),
current_timestamp,
NEW.message,
NEW.message,
FALSE,
'Unsupported automated ALTER SUBSCRIPTION ... REFRESH PUBLICATION until bugfix');
ELSE
EXECUTE 'SET ROLE '||quote_ident(NEW.role)||';';
EXECUTE NEW.message::TEXT;
END IF;

RETURN NEW;
ELSE
RETURN NULL;
END IF;

END;
$function$
;


Loading

0 comments on commit 5083d16

Please sign in to comment.