From 264a49cd39b243b80b88765880c552a2b436b4d5 Mon Sep 17 00:00:00 2001 From: Onno Vos Date: Tue, 14 Feb 2023 13:36:41 +0100 Subject: [PATCH] Issue #3: Add support for logical decoding messages using pgoutput --- README.md | 15 ++++++++++ include/epgl_int.hrl | 7 +++++ src/epgl_pgoutput_decoder.erl | 6 ++++ src/epgl_subscriber.erl | 25 ++++++++++++++-- test/epgl_binary_mode_tests.erl | 16 +++++++++-- test/epgl_tests.erl | 51 +++++++++++++++++++++++++++++++-- 6 files changed, 112 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 0af0ab0..61d6db1 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,20 @@ Callback modules should implement method `handle_replication_msg` }). ``` +On optional callback module may implement the method `handle_logical_decoding_msg` +If implemented, messages in the form of `{logical_decoding_msg, Flags, LSN, Prefix, Content}` will be received. + +```erlang +-callback handle_logical_decoding_msg(#logical_decoding_msg{}) -> ok. + +-record(logical_decoding_msg, { + flags :: integer(), + lsn :: integer(), + prefix :: binary(), + content :: binary() +}). +``` + ### Create replication slot You may need to execute this step every time you start you Erlang application, if you select initial data from tables when start your application (Use case Consistent cache). @@ -274,6 +288,7 @@ epgl.pglogical_config.binary.float8_byval|boolean||Upstream PostgreSQL’s float epgl.pglogical_config.binary.integer_datetimes|boolean||Whether TIME, TIMESTAMP and TIMESTAMP WITH TIME ZONE will be sent using integer or floating point representation. epgl.pglogical_config.binary.basetypes_major_version|string||PostgreSQL mahor version of server, e.g. 905, (select PG_VERSION_NUM()/100); epgl.pglogical_config.pg_version|string||PostgreSQL server_version of server, e.g. 90506 (select PG_VERSION_NUM();) +epgl.logical_decoding_msg_callback|atom|undefined|Callback module for handling logical replication messages ### epgl:start_subscriber options It is possible to define some options when start_subscriber, i.e. call `epgl:start_subscriber(SubscriberId, DBArgs, Callbacks, Options)` diff --git a/include/epgl_int.hrl b/include/epgl_int.hrl index efeb9da..9f91ae0 100644 --- a/include/epgl_int.hrl +++ b/include/epgl_int.hrl @@ -13,6 +13,13 @@ xid :: integer() }). +-record(logical_decoding_msg, { + flags :: integer(), + lsn :: integer(), + prefix :: binary(), + content :: binary() +}). + -record(commit_msg, { flags :: integer(), commit_lsn :: integer(), diff --git a/src/epgl_pgoutput_decoder.erl b/src/epgl_pgoutput_decoder.erl index 0053250..034f45e 100644 --- a/src/epgl_pgoutput_decoder.erl +++ b/src/epgl_pgoutput_decoder.erl @@ -26,6 +26,7 @@ %% logical replication message types -define(MESSAGE_TYPE_BEGIN, $B). +-define(MESSAGE_TYPE_LOGICAL_DECODING, $M). -define(MESSAGE_TYPE_COMMIT, $C). -define(MESSAGE_TYPE_ORIGIN, $O). -define(MESSAGE_TYPE_RELATION, $R). @@ -49,6 +50,11 @@ decode(<>) -> Rec = #begin_msg{lsn = Lsn, commit_time = CommitTime, xid = XID}, {ok, Rec}; +decode(<>) -> + [Prefix, <>] = decode_string(Rest), + Rec = #logical_decoding_msg{flags = Flags, lsn = Lsn, prefix = Prefix, content = Content}, + {ok, Rec}; + decode(<>) -> Rec = #commit_msg{flags = Flags, commit_lsn = CommitLsn, end_lsn = EndLsn, commit_time = CommitTime}, {ok, Rec}; diff --git a/src/epgl_subscriber.erl b/src/epgl_subscriber.erl index 3913890..2a814ae 100644 --- a/src/epgl_subscriber.erl +++ b/src/epgl_subscriber.erl @@ -42,12 +42,19 @@ Metadata :: #{TableName :: string() => [ColumnName :: string()]}, Rows :: [#row{}]) -> ok. +%% Optional +-callback handle_logical_decoding_msg(#logical_decoding_msg{}) -> ok. + +-optional_callbacks([ handle_logical_decoding_msg/1 ]). + -record(state, { db_connect_opts :: epgl:db_args(), conn :: pid() | undefined, conn_normal :: pid() | undefined, + logical_decoding_msg_callback :: atom(), callbacks :: map(), metadata = #{} :: map(), + logical_decoding_msgs = [] :: list(), rows = [] :: list(), replication_slot :: string() | undefined, replication_set :: string() | undefined, @@ -85,6 +92,7 @@ init([DBArgs, Callbacks, Options]) -> State = #state{ conn = Conn, + logical_decoding_msg_callback = application:get_env(?APP, logical_decoding_msg_callback, undefined), callbacks = Callbacks, db_connect_opts = DBArgs, last_processed_lsn = 0, @@ -295,6 +303,7 @@ handle_call({pglogical_msg, _StartLSN, _EndLSN, #begin_msg{}}, _From, State = #s handle_call({pglogical_msg, _StartLSN, EndLSN, #commit_msg{}}, _From, State) -> #state{ rows = Rows, + logical_decoding_msgs = LogicalDecodingMsgs, metadata = Metadata, callbacks = Callbacks, check_lsn_mode = CheckLSNMode, @@ -331,12 +340,24 @@ handle_call({pglogical_msg, _StartLSN, EndLSN, #commit_msg{}}, _From, State) -> fun({CbMod, {MetadataAcc, RowAcc}}) -> ok = CbMod:handle_replication_msg(MetadataAcc, lists:flatten(RowAcc)) end, maps:to_list(CallbackData)), - {reply, ok, State#state{rows = [], last_processed_lsn = NewLastEndLSN}} + case State#state.logical_decoding_msg_callback of + undefined -> + ok; + LogicalDecodingMsgCallback -> + lists:foreach(fun(Msg) -> + ok = LogicalDecodingMsgCallback:handle_logical_decoding_msg(Msg) + end, LogicalDecodingMsgs) + end, + {reply, ok, State#state{logical_decoding_msgs = [], rows = [], last_processed_lsn = NewLastEndLSN}} end; skip -> {reply, ok, State#state{rows = []}} end; +handle_call({pglogical_msg, _StartLSN, _EndLSN, #logical_decoding_msg{} = Msg}, _From, State) -> + #state{logical_decoding_msgs = Msgs} = State, + {reply, ok, State#state{logical_decoding_msgs = [Msg | Msgs]}}; + handle_call({pglogical_msg, _StartLSN, _EndLSN, #relation_msg{name = TableName, id = Relidentifier, columns = Columns, namespace = SchemaName}}, _From, State = #state{metadata = Metadata, conn_normal = ConnNormal, options = Options}) -> @@ -450,7 +471,7 @@ start_replication(pgoutput, Conn, ReplicationSlot, PublicationNames, _ExtraConfi ReplicationSlot, ?MODULE, #cb_state{pid = self(), debug = DebugMode, decoder_module = epgl_pgoutput_decoder}, "0/0", - "proto_version '1', publication_names '\"" ++ PublicationNames ++ "\"'", + "messages 'true', proto_version '1', publication_names '\"" ++ PublicationNames ++ "\"'", [{align_lsn, AlignLSN}]); start_replication(pglogical, Conn, ReplicationSlot, ReplicationSets, PglogicalConfig, AlignLSN) -> DebugMode = application:get_env(?APP, debug, false), diff --git a/test/epgl_binary_mode_tests.erl b/test/epgl_binary_mode_tests.erl index 77bdb63..f0889a8 100644 --- a/test/epgl_binary_mode_tests.erl +++ b/test/epgl_binary_mode_tests.erl @@ -65,7 +65,7 @@ get_table_initial_state_test(Pid) -> {ok, SnapshotName, _ConsistentPoint} = epgl:create_replication_slot(Pid, "epgl_test_repl_slot"), {ok, _Columns, Values} = epgl:get_table_initial_state(Pid, "test_table1", SnapshotName), ok = epgl:drop_replication_slot(Pid, "epgl_test_repl_slot"), - ?_assertEqual([{1,<<"one">>}, {2,<<"two">>}, {3,null}], Values). + ?_assertEqual([{1,<<"one">>}, {2,<<"two">>}, {3,null}, {6,<<"six">>}], Values). init_replication_set_test(Pid) -> true = erlang:register(?MODULE, self()), @@ -75,7 +75,8 @@ init_replication_set_test(Pid) -> [ {row,"public.test_table1",insert,[1,<<"one">>]}, {row,"public.test_table1",insert,[2,<<"two">>]}, - {row,"public.test_table1",insert,[3,null]} + {row,"public.test_table1",insert,[3,null]}, + {row,"public.test_table1",insert,[6,<<"six">>]} ]} ], ok = epgl:init_replication_set(Pid, "epgl_test_repl_set_1", SnapshotName), @@ -86,6 +87,7 @@ init_replication_set_test(Pid) -> start_replication_test(Pid) -> true = erlang:register(?MODULE, self()), + truncate_tables(), %% Clean any old data from previous tests {ok, _, _} = epgl:create_replication_slot(Pid, "epgl_test_repl_slot"), make_changes(), ExpectedMsgs = [ @@ -172,4 +174,12 @@ make_changes() -> age('2016-10-25 16:28:56.669049', '2016-10-20 11:28:56.669049'), '127.0.0.1', '127.0.0.2', '{\"a\": 2, \"b\": [\"c\", \"d\"]}');"), [{ok, 3}, {ok, 1}] = epgsql:squery(C, "delete from test_table1 where id >= 4;delete from test_table3 where id = 1;"), - epgsql:close(C). \ No newline at end of file + epgsql:close(C). + +truncate_tables() -> + {ok, C} = epgsql:connect("localhost", "epgl_test", "epgl_test", + [{database, "epgl_test_db"}, {port, 10432}]), + _ = epgsql:squery(C, "truncate test_table1"), + _ = epgsql:squery(C, "truncate test_table2"), + _ = epgsql:squery(C, "truncate test_table3"), + epgsql:close(C). diff --git a/test/epgl_tests.erl b/test/epgl_tests.erl index 0f4ceb7..a536e80 100644 --- a/test/epgl_tests.erl +++ b/test/epgl_tests.erl @@ -1,7 +1,7 @@ -module(epgl_tests). -include_lib("eunit/include/eunit.hrl"). --export([handle_replication_msg/2, is_pglogical_exists/0]). +-export([handle_replication_msg/2, handle_logical_decoding_msg/1, is_pglogical_exists/0]). -define(DB_ARGS, [{hostname, "localhost"}, {port, 10432}, {database, "epgl_test_db"}, {username, "epgl_test"}, {password, "epgl_test"}]). @@ -140,7 +140,18 @@ all_test_() -> connection_test(Pid), create_replication_slot_temporary_test(Pid) ]} - end} + end}, + {setup, + fun start_pgoutput1/0, + fun stop/1, + fun(Pid) -> + {inorder, + [ + connection_test(Pid), + logical_decoding_message_test(Pid) + ]} + end + } ]; false -> [] end, @@ -150,6 +161,7 @@ start(TwoMsgsForPKUpdate, AutoCast, Plugin) -> application:set_env(epgl, two_msgs_for_pk_update, TwoMsgsForPKUpdate), application:set_env(epgl, max_reconnect_attempts, 10), application:set_env(epgl, repl_slot_output_plugin, Plugin), + application:set_env(epgl, logical_decoding_msg_callback, ?MODULE), Opts = #{"public.test_table1" => [?MODULE], "public.test_table3" => [?MODULE]}, Name = list_to_atom("epgl_subscriber_" ++ integer_to_list(erlang:unique_integer([positive]))), @@ -194,6 +206,7 @@ start_pgoutput3() -> start(TwoMsgsForPKUpdate, AutoCast, Plugin). stop(Pid) -> + application:unset_env(epgl, logical_decoding_msg_callback), catch epgl:stop(Pid). connection_test(Pid) -> @@ -397,6 +410,22 @@ pk_update_test(Pid) -> [ {row,"public.test_table1",delete,[<<"7">>,null]} ]} + ], + ok = epgl:start_replication(Pid, "epgl_test_repl_slot", "epgl_test_repl_set_1"), + Res = receive_replication_msgs(ExpectedMsgs), + true = erlang:unregister(?MODULE), + ?_assertEqual(ok, Res). + +logical_decoding_message_test(Pid) -> + true = erlang:register(?MODULE, self()), + {ok, _, _} = epgl:create_replication_slot(Pid, "epgl_test_repl_slot"), + logical_decoding_msg(), + ExpectedMsgs = [ + {#{"public.test_table1" => [<<"id">>,<<"value">>]}, + [ + {row,"public.test_table1",insert,[<<"6">>,<<"six">>]} + ]}, + {logical_decoding_msg, <<"test_prefix">>, <<"message">>} ], ok = epgl:start_replication(Pid, "epgl_test_repl_slot", "epgl_test_repl_set_1"), Res = receive_replication_msgs(ExpectedMsgs), @@ -410,6 +439,9 @@ receive_replication_msgs([Msg | T]) -> {replication_msg, ColumnsDescription, Fields} -> %% ct:print("replication_msg ~p ~p~n", [ColumnsDescription, Fields]), {ColumnsDescription, Fields} = Msg, + receive_replication_msgs(T); + {logical_decoding_msg, Prefix, Content} -> + {logical_decoding_msg, Prefix, Content} = Msg, receive_replication_msgs(T) after 60000 -> @@ -420,6 +452,11 @@ handle_replication_msg(ColumnsDescription, Fields) -> ?MODULE ! {replication_msg, ColumnsDescription, Fields}, ok. +handle_logical_decoding_msg(Msg) -> + {logical_decoding_msg,_,_, Prefix, Content} = Msg, + ?MODULE ! {logical_decoding_msg, Prefix, Content}, + ok. + make_changes() -> {ok, C} = connect(), {ok, 1} = epgsql:squery(C, "insert into test_table1 (id, value) values (4, 'four');"), @@ -446,6 +483,14 @@ pk_update() -> {ok, 1} = epgsql:squery(C, "delete from test_table1 where id >= 4;"), epgsql:close(C). +logical_decoding_msg() -> + {ok, C} = connect(), + {ok, _, _} = epgsql:squery(C, "BEGIN;"), + {ok, 1} = epgsql:squery(C, "insert into test_table1 (id, value) values (6, 'six');"), + {ok, _, [{_}]} = epgsql:squery(C, "SELECT pg_logical_emit_message(true, 'test_prefix', 'message');"), + {ok, _, _} = epgsql:squery(C, "COMMIT;"), + epgsql:close(C). + pg_version() -> {ok, C} = connect(), {ok, _, [{Version}]} = epgsql:equery(C, ["select current_setting('server_version_num')::int"]), @@ -464,4 +509,4 @@ connect() -> User = proplists:get_value(username, ?DB_ARGS), Password = proplists:get_value(password, ?DB_ARGS), Port = proplists:get_value(port, ?DB_ARGS, 5432), - epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}]). \ No newline at end of file + epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}]).