Skip to content

Commit

Permalink
Merge pull request #2 from udacity/bernielomax/feat/change-event-incl…
Browse files Browse the repository at this point in the history
…udes-id

feat: Change events include the changeset id
  • Loading branch information
bernielomax authored Jan 20, 2021
2 parents 854ae05 + 8936027 commit 86cb630
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 1 deletion.
1 change: 1 addition & 0 deletions changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func ParseChangesetKind(kind string) ChangesetKind {

// Changeset represents a changeset for a record on a Postgres table.
type Changeset struct {
ID int64 `json:"id"`
Kind ChangesetKind `json:"kind"`
Schema string `json:"schema"`
Table string `json:"table"`
Expand Down
1 change: 1 addition & 0 deletions db/wal2json.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type Wal2JSONMessage struct {

// Wal2JSONChange represents a changeset within a Wal2JSONMessage.
type Wal2JSONChange struct {
ID int64 `json:"id"`
Kind string `json:"kind"`
Schema string `json:"schema"`
Table string `json:"table"`
Expand Down
1 change: 1 addition & 0 deletions lr_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func (l *LogicalReplicationListener) processMessage(msg *pgx.ReplicationMessage)

for _, change := range w2jmsg.Changes {
cs := &Changeset{
ID: change.ID,
Kind: ParseChangesetKind(change.Kind),
Schema: change.Schema,
Table: change.Table,
Expand Down
4 changes: 3 additions & 1 deletion notify_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
"time"

"github.com/jackc/pgx"
"github.com/perangel/warp-pipe/internal/store"
log "github.com/sirupsen/logrus"

"github.com/perangel/warp-pipe/internal/store"
)

// NotifyOption is a NotifyListener option function
Expand Down Expand Up @@ -167,6 +168,7 @@ func (l *NotifyListener) processMessage(msg *pgx.Notification) {

func (l *NotifyListener) processChangeset(event *store.Event) {
cs := &Changeset{
ID: event.ID,
Kind: ParseChangesetKind(event.Action),
Schema: event.SchemaName,
Table: event.TableName,
Expand Down

0 comments on commit 86cb630

Please sign in to comment.