From 8936027b33953bb1c067e3dbce3074350edeeb56 Mon Sep 17 00:00:00 2001 From: Justin Pye Date: Thu, 21 Jan 2021 06:59:55 +1200 Subject: [PATCH] feat: Change events include the changeset id This is so that Axon can auto-shutdown by checking the change event id matches that of the last record in the changeset table. --- changeset.go | 1 + db/wal2json.go | 1 + lr_listener.go | 1 + notify_listener.go | 4 +++- 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/changeset.go b/changeset.go index 61ea6f81..d38cf37f 100644 --- a/changeset.go +++ b/changeset.go @@ -34,6 +34,7 @@ func ParseChangesetKind(kind string) ChangesetKind { // Changeset represents a changeset for a record on a Postgres table. type Changeset struct { + ID int64 `json:"id"` Kind ChangesetKind `json:"kind"` Schema string `json:"schema"` Table string `json:"table"` diff --git a/db/wal2json.go b/db/wal2json.go index c4f53080..817dd883 100644 --- a/db/wal2json.go +++ b/db/wal2json.go @@ -8,6 +8,7 @@ type Wal2JSONMessage struct { // Wal2JSONChange represents a changeset within a Wal2JSONMessage. type Wal2JSONChange struct { + ID int64 `json:"id"` Kind string `json:"kind"` Schema string `json:"schema"` Table string `json:"table"` diff --git a/lr_listener.go b/lr_listener.go index cc7bc95a..542d9487 100644 --- a/lr_listener.go +++ b/lr_listener.go @@ -222,6 +222,7 @@ func (l *LogicalReplicationListener) processMessage(msg *pgx.ReplicationMessage) for _, change := range w2jmsg.Changes { cs := &Changeset{ + ID: change.ID, Kind: ParseChangesetKind(change.Kind), Schema: change.Schema, Table: change.Table, diff --git a/notify_listener.go b/notify_listener.go index 32ecddd4..4efa0a41 100644 --- a/notify_listener.go +++ b/notify_listener.go @@ -8,8 +8,9 @@ import ( "time" "github.com/jackc/pgx" - "github.com/perangel/warp-pipe/internal/store" log "github.com/sirupsen/logrus" + + "github.com/perangel/warp-pipe/internal/store" ) // NotifyOption is a NotifyListener option function @@ -167,6 +168,7 @@ func (l *NotifyListener) processMessage(msg *pgx.Notification) { func (l *NotifyListener) processChangeset(event *store.Event) { cs := &Changeset{ + ID: event.ID, Kind: ParseChangesetKind(event.Action), Schema: event.SchemaName, Table: event.TableName,