Skip to content

Commit

Permalink
Merge pull request #4 from onno-vos-dev/issue-3-add-support-for-logic…
Browse files Browse the repository at this point in the history
…al-decoding-messages

Issue #3: Add support for logical decoding messages using pgoutput
  • Loading branch information
MikhailKalashnikov authored Feb 17, 2023
2 parents f8f4a61 + 264a49c commit 01da0a7
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 8 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ Callback modules should implement method `handle_replication_msg`
}).
```

On optional callback module may implement the method `handle_logical_decoding_msg`
If implemented, messages in the form of `{logical_decoding_msg, Flags, LSN, Prefix, Content}` will be received.

```erlang
-callback handle_logical_decoding_msg(#logical_decoding_msg{}) -> ok.

-record(logical_decoding_msg, {
flags :: integer(),
lsn :: integer(),
prefix :: binary(),
content :: binary()
}).
```

### Create replication slot
You may need to execute this step every time you start you Erlang application,
if you select initial data from tables when start your application (Use case Consistent cache).
Expand Down Expand Up @@ -274,6 +288,7 @@ epgl.pglogical_config.binary.float8_byval|boolean||Upstream PostgreSQL’s float
epgl.pglogical_config.binary.integer_datetimes|boolean||Whether TIME, TIMESTAMP and TIMESTAMP WITH TIME ZONE will be sent using integer or floating point representation.
epgl.pglogical_config.binary.basetypes_major_version|string||PostgreSQL mahor version of server, e.g. 905, (select PG_VERSION_NUM()/100);
epgl.pglogical_config.pg_version|string||PostgreSQL server_version of server, e.g. 90506 (select PG_VERSION_NUM();)
epgl.logical_decoding_msg_callback|atom|undefined|Callback module for handling logical replication messages

### epgl:start_subscriber options
It is possible to define some options when start_subscriber, i.e. call `epgl:start_subscriber(SubscriberId, DBArgs, Callbacks, Options)`
Expand Down
7 changes: 7 additions & 0 deletions include/epgl_int.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
xid :: integer()
}).

-record(logical_decoding_msg, {
flags :: integer(),
lsn :: integer(),
prefix :: binary(),
content :: binary()
}).

-record(commit_msg, {
flags :: integer(),
commit_lsn :: integer(),
Expand Down
6 changes: 6 additions & 0 deletions src/epgl_pgoutput_decoder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

%% logical replication message types
-define(MESSAGE_TYPE_BEGIN, $B).
-define(MESSAGE_TYPE_LOGICAL_DECODING, $M).
-define(MESSAGE_TYPE_COMMIT, $C).
-define(MESSAGE_TYPE_ORIGIN, $O).
-define(MESSAGE_TYPE_RELATION, $R).
Expand All @@ -49,6 +50,11 @@ decode(<<?MESSAGE_TYPE_BEGIN:8, Lsn:?int64, CommitTime:?int64, XID:?int32>>) ->
Rec = #begin_msg{lsn = Lsn, commit_time = CommitTime, xid = XID},
{ok, Rec};

decode(<<?MESSAGE_TYPE_LOGICAL_DECODING:8, Flags:8, Lsn:?int64, Rest/binary>>) ->
[Prefix, <<Len:?int32, Content:Len/binary>>] = decode_string(Rest),
Rec = #logical_decoding_msg{flags = Flags, lsn = Lsn, prefix = Prefix, content = Content},
{ok, Rec};

decode(<<?MESSAGE_TYPE_COMMIT:8, Flags:8, CommitLsn:?int64, EndLsn:?int64, CommitTime:?int64>>) ->
Rec = #commit_msg{flags = Flags, commit_lsn = CommitLsn, end_lsn = EndLsn, commit_time = CommitTime},
{ok, Rec};
Expand Down
25 changes: 23 additions & 2 deletions src/epgl_subscriber.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,19 @@
Metadata :: #{TableName :: string() => [ColumnName :: string()]},
Rows :: [#row{}]) -> ok.

%% Optional
-callback handle_logical_decoding_msg(#logical_decoding_msg{}) -> ok.

-optional_callbacks([ handle_logical_decoding_msg/1 ]).

-record(state, {
db_connect_opts :: epgl:db_args(),
conn :: pid() | undefined,
conn_normal :: pid() | undefined,
logical_decoding_msg_callback :: atom(),
callbacks :: map(),
metadata = #{} :: map(),
logical_decoding_msgs = [] :: list(),
rows = [] :: list(),
replication_slot :: string() | undefined,
replication_set :: string() | undefined,
Expand Down Expand Up @@ -85,6 +92,7 @@ init([DBArgs, Callbacks, Options]) ->

State = #state{
conn = Conn,
logical_decoding_msg_callback = application:get_env(?APP, logical_decoding_msg_callback, undefined),
callbacks = Callbacks,
db_connect_opts = DBArgs,
last_processed_lsn = 0,
Expand Down Expand Up @@ -295,6 +303,7 @@ handle_call({pglogical_msg, _StartLSN, _EndLSN, #begin_msg{}}, _From, State = #s
handle_call({pglogical_msg, _StartLSN, EndLSN, #commit_msg{}}, _From, State) ->
#state{
rows = Rows,
logical_decoding_msgs = LogicalDecodingMsgs,
metadata = Metadata,
callbacks = Callbacks,
check_lsn_mode = CheckLSNMode,
Expand Down Expand Up @@ -331,12 +340,24 @@ handle_call({pglogical_msg, _StartLSN, EndLSN, #commit_msg{}}, _From, State) ->
fun({CbMod, {MetadataAcc, RowAcc}}) ->
ok = CbMod:handle_replication_msg(MetadataAcc, lists:flatten(RowAcc))
end, maps:to_list(CallbackData)),
{reply, ok, State#state{rows = [], last_processed_lsn = NewLastEndLSN}}
case State#state.logical_decoding_msg_callback of
undefined ->
ok;
LogicalDecodingMsgCallback ->
lists:foreach(fun(Msg) ->
ok = LogicalDecodingMsgCallback:handle_logical_decoding_msg(Msg)
end, LogicalDecodingMsgs)
end,
{reply, ok, State#state{logical_decoding_msgs = [], rows = [], last_processed_lsn = NewLastEndLSN}}
end;
skip ->
{reply, ok, State#state{rows = []}}
end;

handle_call({pglogical_msg, _StartLSN, _EndLSN, #logical_decoding_msg{} = Msg}, _From, State) ->
#state{logical_decoding_msgs = Msgs} = State,
{reply, ok, State#state{logical_decoding_msgs = [Msg | Msgs]}};

handle_call({pglogical_msg, _StartLSN, _EndLSN, #relation_msg{name = TableName, id = Relidentifier,
columns = Columns, namespace = SchemaName}}, _From,
State = #state{metadata = Metadata, conn_normal = ConnNormal, options = Options}) ->
Expand Down Expand Up @@ -450,7 +471,7 @@ start_replication(pgoutput, Conn, ReplicationSlot, PublicationNames, _ExtraConfi
ReplicationSlot, ?MODULE,
#cb_state{pid = self(), debug = DebugMode, decoder_module = epgl_pgoutput_decoder},
"0/0",
"proto_version '1', publication_names '\"" ++ PublicationNames ++ "\"'",
"messages 'true', proto_version '1', publication_names '\"" ++ PublicationNames ++ "\"'",
[{align_lsn, AlignLSN}]);
start_replication(pglogical, Conn, ReplicationSlot, ReplicationSets, PglogicalConfig, AlignLSN) ->
DebugMode = application:get_env(?APP, debug, false),
Expand Down
16 changes: 13 additions & 3 deletions test/epgl_binary_mode_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ get_table_initial_state_test(Pid) ->
{ok, SnapshotName, _ConsistentPoint} = epgl:create_replication_slot(Pid, "epgl_test_repl_slot"),
{ok, _Columns, Values} = epgl:get_table_initial_state(Pid, "test_table1", SnapshotName),
ok = epgl:drop_replication_slot(Pid, "epgl_test_repl_slot"),
?_assertEqual([{1,<<"one">>}, {2,<<"two">>}, {3,null}], Values).
?_assertEqual([{1,<<"one">>}, {2,<<"two">>}, {3,null}, {6,<<"six">>}], Values).

init_replication_set_test(Pid) ->
true = erlang:register(?MODULE, self()),
Expand All @@ -75,7 +75,8 @@ init_replication_set_test(Pid) ->
[
{row,"public.test_table1",insert,[1,<<"one">>]},
{row,"public.test_table1",insert,[2,<<"two">>]},
{row,"public.test_table1",insert,[3,null]}
{row,"public.test_table1",insert,[3,null]},
{row,"public.test_table1",insert,[6,<<"six">>]}
]}
],
ok = epgl:init_replication_set(Pid, "epgl_test_repl_set_1", SnapshotName),
Expand All @@ -86,6 +87,7 @@ init_replication_set_test(Pid) ->

start_replication_test(Pid) ->
true = erlang:register(?MODULE, self()),
truncate_tables(), %% Clean any old data from previous tests
{ok, _, _} = epgl:create_replication_slot(Pid, "epgl_test_repl_slot"),
make_changes(),
ExpectedMsgs = [
Expand Down Expand Up @@ -172,4 +174,12 @@ make_changes() ->
age('2016-10-25 16:28:56.669049', '2016-10-20 11:28:56.669049'), '127.0.0.1', '127.0.0.2',
'{\"a\": 2, \"b\": [\"c\", \"d\"]}');"),
[{ok, 3}, {ok, 1}] = epgsql:squery(C, "delete from test_table1 where id >= 4;delete from test_table3 where id = 1;"),
epgsql:close(C).
epgsql:close(C).

truncate_tables() ->
{ok, C} = epgsql:connect("localhost", "epgl_test", "epgl_test",
[{database, "epgl_test_db"}, {port, 10432}]),
_ = epgsql:squery(C, "truncate test_table1"),
_ = epgsql:squery(C, "truncate test_table2"),
_ = epgsql:squery(C, "truncate test_table3"),
epgsql:close(C).
51 changes: 48 additions & 3 deletions test/epgl_tests.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-module(epgl_tests).

-include_lib("eunit/include/eunit.hrl").
-export([handle_replication_msg/2, is_pglogical_exists/0]).
-export([handle_replication_msg/2, handle_logical_decoding_msg/1, is_pglogical_exists/0]).
-define(DB_ARGS, [{hostname, "localhost"}, {port, 10432},
{database, "epgl_test_db"}, {username, "epgl_test"}, {password, "epgl_test"}]).

Expand Down Expand Up @@ -140,7 +140,18 @@ all_test_() ->
connection_test(Pid),
create_replication_slot_temporary_test(Pid)
]}
end}
end},
{setup,
fun start_pgoutput1/0,
fun stop/1,
fun(Pid) ->
{inorder,
[
connection_test(Pid),
logical_decoding_message_test(Pid)
]}
end
}
];
false -> []
end,
Expand All @@ -150,6 +161,7 @@ start(TwoMsgsForPKUpdate, AutoCast, Plugin) ->
application:set_env(epgl, two_msgs_for_pk_update, TwoMsgsForPKUpdate),
application:set_env(epgl, max_reconnect_attempts, 10),
application:set_env(epgl, repl_slot_output_plugin, Plugin),
application:set_env(epgl, logical_decoding_msg_callback, ?MODULE),

Opts = #{"public.test_table1" => [?MODULE], "public.test_table3" => [?MODULE]},
Name = list_to_atom("epgl_subscriber_" ++ integer_to_list(erlang:unique_integer([positive]))),
Expand Down Expand Up @@ -194,6 +206,7 @@ start_pgoutput3() ->
start(TwoMsgsForPKUpdate, AutoCast, Plugin).

stop(Pid) ->
application:unset_env(epgl, logical_decoding_msg_callback),
catch epgl:stop(Pid).

connection_test(Pid) ->
Expand Down Expand Up @@ -397,6 +410,22 @@ pk_update_test(Pid) ->
[
{row,"public.test_table1",delete,[<<"7">>,null]}
]}
],
ok = epgl:start_replication(Pid, "epgl_test_repl_slot", "epgl_test_repl_set_1"),
Res = receive_replication_msgs(ExpectedMsgs),
true = erlang:unregister(?MODULE),
?_assertEqual(ok, Res).

logical_decoding_message_test(Pid) ->
true = erlang:register(?MODULE, self()),
{ok, _, _} = epgl:create_replication_slot(Pid, "epgl_test_repl_slot"),
logical_decoding_msg(),
ExpectedMsgs = [
{#{"public.test_table1" => [<<"id">>,<<"value">>]},
[
{row,"public.test_table1",insert,[<<"6">>,<<"six">>]}
]},
{logical_decoding_msg, <<"test_prefix">>, <<"message">>}
],
ok = epgl:start_replication(Pid, "epgl_test_repl_slot", "epgl_test_repl_set_1"),
Res = receive_replication_msgs(ExpectedMsgs),
Expand All @@ -410,6 +439,9 @@ receive_replication_msgs([Msg | T]) ->
{replication_msg, ColumnsDescription, Fields} ->
%% ct:print("replication_msg ~p ~p~n", [ColumnsDescription, Fields]),
{ColumnsDescription, Fields} = Msg,
receive_replication_msgs(T);
{logical_decoding_msg, Prefix, Content} ->
{logical_decoding_msg, Prefix, Content} = Msg,
receive_replication_msgs(T)
after
60000 ->
Expand All @@ -420,6 +452,11 @@ handle_replication_msg(ColumnsDescription, Fields) ->
?MODULE ! {replication_msg, ColumnsDescription, Fields},
ok.

handle_logical_decoding_msg(Msg) ->
{logical_decoding_msg,_,_, Prefix, Content} = Msg,
?MODULE ! {logical_decoding_msg, Prefix, Content},
ok.

make_changes() ->
{ok, C} = connect(),
{ok, 1} = epgsql:squery(C, "insert into test_table1 (id, value) values (4, 'four');"),
Expand All @@ -446,6 +483,14 @@ pk_update() ->
{ok, 1} = epgsql:squery(C, "delete from test_table1 where id >= 4;"),
epgsql:close(C).

logical_decoding_msg() ->
{ok, C} = connect(),
{ok, _, _} = epgsql:squery(C, "BEGIN;"),
{ok, 1} = epgsql:squery(C, "insert into test_table1 (id, value) values (6, 'six');"),
{ok, _, [{_}]} = epgsql:squery(C, "SELECT pg_logical_emit_message(true, 'test_prefix', 'message');"),
{ok, _, _} = epgsql:squery(C, "COMMIT;"),
epgsql:close(C).

pg_version() ->
{ok, C} = connect(),
{ok, _, [{Version}]} = epgsql:equery(C, ["select current_setting('server_version_num')::int"]),
Expand All @@ -464,4 +509,4 @@ connect() ->
User = proplists:get_value(username, ?DB_ARGS),
Password = proplists:get_value(password, ?DB_ARGS),
Port = proplists:get_value(port, ?DB_ARGS, 5432),
epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}]).
epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}]).

0 comments on commit 01da0a7

Please sign in to comment.