Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds 'skip-empty-xacts' support #248

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
/results/
/wal2json.so
*.o

.vscode
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \
delete3 delete4 savepoint specialvalue toast bytea message typmod \
filtertable selecttable include_timestamp include_lsn include_xids \
include_domain_data_type truncate type_oid actions position default \
pk rename_column
pk rename_column skip_empty_xacts

PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Parameters
* `include-transaction`: emit records denoting the start and end of each transaction. Default is _true_.
* `include-unchanged-toast` (deprecated): Don't use it. It is deprecated.
* `filter-origins`: exclude changes from the specified origins. Default is empty which means that no origin will be filtered. It is a comma separated value.
* `skip-empty-xacts` ignore empty transactions. Default is _false_.
adrijshikhar marked this conversation as resolved.
Show resolved Hide resolved
* `filter-tables`: exclude rows from the specified tables. Default is empty which means that no table will be filtered. It is a comma separated value. The tables should be schema-qualified. `*.foo` means table foo in all schemas and `bar.*` means all tables in schema bar. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table are case-sensitive. Table `"public"."Foo bar"` should be specified as `public.Foo\ bar`.
* `add-tables`: include only rows from the specified tables. Default is all tables from all schemas. It has the same rules from `filter-tables`.
* `filter-msg-prefixes`: exclude messages if prefix is in the list. Default is empty which means that no message will be filtered. It is a comma separated value.
Expand Down
72 changes: 72 additions & 0 deletions expected/skip_empty_xacts.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
\set VERBOSITY terse
-- predictability
SET synchronous_commit = on;

SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');

CREATE TABLE xact_test(data text);
INSERT INTO xact_test VALUES ('before-test');

-- bug #13844, xids in non-decoded records need to be inspected
BEGIN;
-- perform operation in xact that creates and logs xid, but isn't decoded
SELECT * FROM xact_test FOR UPDATE;
SAVEPOINT foo;
-- and now actually insert in subxact, xid is expected to be known
INSERT INTO xact_test VALUES ('after-assignment');
COMMIT;
-- and now show those changes
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');

-- bug #14279, do not propagate null snapshot from subtransaction
BEGIN;
-- first insert
INSERT INTO xact_test VALUES ('main-txn');
SAVEPOINT foo;
-- now perform operation in subxact that creates and logs xid, but isn't decoded
SELECT 1 FROM xact_test FOR UPDATE LIMIT 1;
COMMIT;
-- and now show those changes
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');

DROP TABLE xact_test;

SELECT pg_drop_replication_slot('regression_slot');
SET
ERROR: replication slot "regression_slot" already exists
CREATE TABLE
INSERT 0 1
BEGIN
data
-------------
before-test
(1 row)

SAVEPOINT
INSERT 0 1
COMMIT
data
--------------------------------------------------------------------------------------------------------------------------------------------------------
{"change":[{"kind":"insert","schema":"public","table":"xact_test","columnnames":["data"],"columntypes":["text"],"columnvalues":["before-test"]}]}
{"change":[{"kind":"insert","schema":"public","table":"xact_test","columnnames":["data"],"columntypes":["text"],"columnvalues":["after-assignment"]}]}
(2 rows)

BEGIN
INSERT 0 1
SAVEPOINT
?column?
----------
1
(1 row)

COMMIT
data
------------------------------------------------------------------------------------------------------------------------------------------------
{"change":[{"kind":"insert","schema":"public","table":"xact_test","columnnames":["data"],"columntypes":["text"],"columnvalues":["main-txn"]}]}
(1 row)

DROP TABLE
pg_drop_replication_slot
--------------------------

(1 row)
31 changes: 31 additions & 0 deletions sql/skip_empty_xacts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- predictability
SET synchronous_commit = on;

SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these tests exercise the proposal? Why don't you use simple statements (for example, using filter-tables or add-tables option)? You should include tests for both formats (see other test files).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


CREATE TABLE xact_test(data text);
INSERT INTO xact_test VALUES ('before-test');

BEGIN;
-- perform operation in xact that creates and logs xid, but isn't decoded
SELECT * FROM xact_test FOR UPDATE;
SAVEPOINT foo;
-- and now actually insert in subxact, xid is expected to be known
INSERT INTO xact_test VALUES ('after-assignment');
COMMIT;
-- and now show those changes
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');

BEGIN;
-- first insert
INSERT INTO xact_test VALUES ('main-txn');
SAVEPOINT foo;
-- now perform operation in subxact that creates and logs xid, but isn't decoded
SELECT 1 FROM xact_test FOR UPDATE LIMIT 1;
COMMIT;
-- and now show those changes
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');

DROP TABLE xact_test;

SELECT pg_drop_replication_slot('regression_slot');
91 changes: 81 additions & 10 deletions wal2json.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ typedef struct

bool pretty_print; /* pretty-print JSON? */
bool write_in_chunks; /* write in chunks? (v1) */
bool skip_empty_xacts; /* skip empty transactions */

JsonAction actions; /* output only these actions */

Expand All @@ -95,6 +96,18 @@ typedef struct
char sp[2]; /* space, if pretty print */
} JsonDecodingData;

/*
* Maintain a per-transaction structure to track whether the
* transaction have written any changes. This is required so that if user
* has requested to skip the empty transactions we can skip the empty streams
* even though the transaction has written some changes.
*/
typedef struct
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this new struct? Can't you use data->nr_changes?

{
bool xact_wrote_changes;
} JsonTxnData;


typedef enum
{
PGOUTPUTJSON_CHANGE,
Expand All @@ -115,6 +128,9 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *
static void pg_decode_shutdown(LogicalDecodingContext *ctx);
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_output_begin(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
bool wrote_change);
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
static void pg_decode_change(LogicalDecodingContext *ctx,
Expand Down Expand Up @@ -150,7 +166,7 @@ static bool pg_add_by_table(List *add_tables, char *schemaname, char *tablename)

/* version 1 */
static void pg_decode_begin_txn_v1(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
ReorderBufferTXN *txn, bool wrote_change);
static void pg_decode_commit_txn_v1(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
static void pg_decode_change_v1(LogicalDecodingContext *ctx,
Expand All @@ -170,7 +186,7 @@ static void pg_decode_truncate_v1(LogicalDecodingContext *ctx,

/* version 2 */
static void pg_decode_begin_txn_v2(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
ReorderBufferTXN *txn, bool wrote_change);
static void pg_decode_commit_txn_v2(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
static void pg_decode_write_value(LogicalDecodingContext *ctx, Datum value, bool isnull, Oid typid);
Expand Down Expand Up @@ -266,6 +282,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
data->include_lsn = false;
data->include_not_null = false;
data->include_default = false;
data->skip_empty_xacts = false;
data->filter_origins = NIL;
data->filter_tables = NIL;
data->filter_msg_prefixes = NIL;
Expand Down Expand Up @@ -607,6 +624,19 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
pfree(rawstr);
}
}
else if (strcmp(elem->defname, "skip-empty-xacts") ==0)
{
if (elem->arg == NULL)
{
elog(DEBUG1, "skip-empty-xacts argument is null");
data->skip_empty_xacts = false;
}
else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "filter-tables") == 0)
{
char *rawstr;
Expand Down Expand Up @@ -784,26 +814,47 @@ pg_filter_by_origin(LogicalDecodingContext *ctx, RepOriginId origin_id)
/* BEGIN callback */
static void
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
JsonDecodingData *data = ctx->output_plugin_private;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some space issues here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok
how to fix that?
which IDE and linter you use?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the spaces and check the diff again. I use vim. The rules for indentation comes from pgindent (see src/tools/pgindent in the Postgres tree).

Copy link
Author

@adrijshikhar adrijshikhar Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i tried pgindent but it made a lot of changes. i don't think I used the right typedef file. Can you share your configuration?

JsonTxnData *txndata = MemoryContextAllocZero(ctx->context, sizeof(JsonTxnData));
txndata->xact_wrote_changes = false;
txn->output_plugin_private = txndata;

adrijshikhar marked this conversation as resolved.
Show resolved Hide resolved
/*
adrijshikhar marked this conversation as resolved.
Show resolved Hide resolved
* If asked to skip empty transactions, we'll emit BEGIN at the point
* where the first operation is received for this transaction.
*/
if (data->skip_empty_xacts)
{
elog(DEBUG2, "skipping an empty transaction");
return;
}

pg_output_begin(ctx, txn, true);
}

static void
pg_output_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, bool wrote_change)
{
JsonDecodingData *data = ctx->output_plugin_private;

if (data->format_version == 2)
pg_decode_begin_txn_v2(ctx, txn);
pg_decode_begin_txn_v2(ctx, txn, wrote_change);
else if (data->format_version == 1)
pg_decode_begin_txn_v1(ctx, txn);
pg_decode_begin_txn_v1(ctx, txn, wrote_change);
else
elog(ERROR, "format version %d is not supported", data->format_version);
}

static void
pg_decode_begin_txn_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
pg_decode_begin_txn_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, bool wrote_change)
{
JsonDecodingData *data = ctx->output_plugin_private;

data->nr_changes = 0;

/* Transaction starts */
OutputPluginPrepareWrite(ctx, true);
OutputPluginPrepareWrite(ctx, wrote_change);

appendStringInfo(ctx->out, "{%s", data->nl);

Expand All @@ -830,19 +881,19 @@ pg_decode_begin_txn_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
appendStringInfo(ctx->out, "%s\"change\":%s[", data->ht, data->sp);

if (data->write_in_chunks)
OutputPluginWrite(ctx, true);
OutputPluginWrite(ctx, wrote_change);
}

static void
pg_decode_begin_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
pg_decode_begin_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, bool wrote_change)
{
JsonDecodingData *data = ctx->output_plugin_private;

/* don't include BEGIN object */
if (!data->include_transaction)
return;

OutputPluginPrepareWrite(ctx, true);
OutputPluginPrepareWrite(ctx, wrote_change);
appendStringInfoString(ctx->out, "{\"action\":\"B\"");
if (data->include_xids)
appendStringInfo(ctx->out, ",\"xid\":%u", txn->xid);
Expand All @@ -866,7 +917,7 @@ pg_decode_begin_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
}

appendStringInfoChar(ctx->out, '}');
OutputPluginWrite(ctx, true);
OutputPluginWrite(ctx, wrote_change);
}

/* COMMIT callback */
Expand All @@ -875,11 +926,19 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
JsonDecodingData *data = ctx->output_plugin_private;
JsonTxnData *txndata = txn->output_plugin_private;
bool xact_wrote_changes = txndata->xact_wrote_changes;

#if PG_VERSION_NUM >= 100000
OutputPluginUpdateProgress(ctx);
#endif

pfree(txndata);
txn->output_plugin_private = NULL;

if (data->skip_empty_xacts && !xact_wrote_changes)
return;

elog(DEBUG2, "my change counter: " UINT64_FORMAT " ; # of changes: " UINT64_FORMAT " ; # of changes in memory: " UINT64_FORMAT, data->nr_changes, txn->nentries, txn->nentries_mem);
elog(DEBUG2, "# of subxacts: %d", txn->nsubtxns);

Expand Down Expand Up @@ -1578,6 +1637,12 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
JsonDecodingData *data = ctx->output_plugin_private;
JsonTxnData *txndata = txn->output_plugin_private;

/* output BEGIN if we haven't yet */
if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
pg_output_begin(ctx, txn, false);
txndata->xact_wrote_changes = true;

if (data->format_version == 2)
pg_decode_change_v2(ctx, txn, relation, change);
Expand Down Expand Up @@ -2569,6 +2634,12 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx,
ReorderBufferChange *change)
{
JsonDecodingData *data = ctx->output_plugin_private;
JsonTxnData *txndata = txn->output_plugin_private;

/* output BEGIN if we haven't yet */
if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
pg_output_begin(ctx, txn, false);
txndata->xact_wrote_changes = true;

if (data->format_version == 2)
pg_decode_truncate_v2(ctx, txn, n, relations, change);
Expand Down