Skip to content

Commit

Permalink
feat(cdc): read from multiple tables (#129)
Browse files Browse the repository at this point in the history
* feat: read from multiple tables

* use sdk constant

* feat(longpoll): use opencdc.collection

* read from all tables

* update paramgen.go

* update readme

* restore readme

* read columns from multiple tables

* fix typo

* restore

* leftovers from #131

* more leftovers

* update readme on destination

* update

* update readme

* update comment

* update comment

* use opencdc constant

* start making changes on readme

* address after rebase

* wip

* wip

* tie snapshot iterator into longpolling mode

* get table key if not supplied manually

* ensure snapshot iterator waits for all acks

* fix snapshot iterator tests

* add test for ensuring iterator waits for acks

* fix cdc tests

* undo changes on readme

* address PR review

* use pg_tables

---------

Co-authored-by: Lovro Mažgon <[email protected]>
  • Loading branch information
raulb and lovromazgon authored Apr 24, 2024
1 parent 66b1913 commit 4466f41
Show file tree
Hide file tree
Showing 21 changed files with 299 additions and 232 deletions.
41 changes: 14 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,21 @@ can't be determined it will fail.

## Configuration Options

| name | description | required | default |
|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
| `url` | Connection string for the Postgres database. | true | |
| `table` | List of table names to read from, separated by comma. Example: `"employees,offices,payments"` | true | |
| `key` | List of Key column names per table, separated by comma. Example:`"table1:key1,table2:key2"`, if not supplied, the table primary key will be used as the `'Key'` field for the records. | false | |
| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` |
| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` |
| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` |
| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` |
| name | description | required | default |
|---------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
| `url` | Connection string for the Postgres database. | true | |
| `table` | List of table names to read from, separated by comma. Using `*` will read from all public tables. | true | |
| `key` | List of Key column names per table, separated by comma. Example:`"table1:key1,table2:key2"`, if not supplied, the table(s) primary keys will be used as the `'Key'` field for the records. | false | |
| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` |
| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` |
| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` |
| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` |

# Destination

The Postgres Destination takes a `record.Record` and parses it into a valid SQL query. The Destination is designed to
handle different payloads and keys. Because of this, each record is individually parsed and upserted.

## Table Name

If a record contains a `postgres.table` property in its metadata it will be inserted in that table, otherwise it will
fall back to use the table configured in the connector. This way the Destination can support multiple tables in the same
connector.

This is especially important in a pipeline where the source is also a Postgres connector, as the source will include the
`postgres.table` field in the metadata of each record. If you want to reroute the records to a different table, you have
to modify the `postgres.table` field in the record's metadata using a processor.

## Upsert Behavior

If the target table already contains a record with the same key, the Destination will upsert with its current received
Expand All @@ -87,14 +77,11 @@ If there is no key, the record will be simply appended.

## Configuration Options

| name | description | required | default |
|---------|-----------------------------------------------------------------------------|----------|---------|
| `url` | Connection string for the Postgres database. | true | |
| `table` | The name of the table in Postgres that the connector should write to.* | false | |
| `key` | Column name used to detect if the target table already contains the record. | false | |

*Note that the `postgres.table` field in the record's metadata will override the `table` property in the destination's
configuration. Please refer to [Table Name](#table-name) for more information.
| name | description | required | default |
|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------------------------------------------|
| `url` | Connection string for the Postgres database. | true | |
| `table` | Table name. It can contain a Go template that will be executed for each record to determine the table. By default, the table is the value of the `opencdc.collection` metadata field. | false | `{{ index .Metadata "opencdc.collection" }}` |
| `key` | Column name used to detect if the target table already contains the record. | false | |

# Testing

Expand Down
6 changes: 0 additions & 6 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ import (
"github.com/jackc/pgx/v5"
)

const (
// TODO same constant is defined in packages longpoll, logrepl and destination
// use same constant everywhere
MetadataOpenCDCCollection = "opencdc.collection"
)

type Destination struct {
sdk.UnimplementedDestination

Expand Down
10 changes: 5 additions & 5 deletions destination_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestDestination_Write(t *testing.T) {
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationSnapshot,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Metadata: map[string]string{sdk.MetadataCollection: tableName},
Key: sdk.StructuredData{"id": 5000},
Payload: sdk.Change{
After: sdk.StructuredData{
Expand All @@ -71,7 +71,7 @@ func TestDestination_Write(t *testing.T) {
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationCreate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Metadata: map[string]string{sdk.MetadataCollection: tableName},
Key: sdk.StructuredData{"id": 5},
Payload: sdk.Change{
After: sdk.StructuredData{
Expand All @@ -86,7 +86,7 @@ func TestDestination_Write(t *testing.T) {
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationUpdate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Metadata: map[string]string{sdk.MetadataCollection: tableName},
Key: sdk.StructuredData{"id": 6},
Payload: sdk.Change{
After: sdk.StructuredData{
Expand All @@ -101,7 +101,7 @@ func TestDestination_Write(t *testing.T) {
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationUpdate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Metadata: map[string]string{sdk.MetadataCollection: tableName},
Key: sdk.StructuredData{"id": 1},
Payload: sdk.Change{
After: sdk.StructuredData{
Expand All @@ -115,7 +115,7 @@ func TestDestination_Write(t *testing.T) {
name: "delete",
record: sdk.Record{
Position: sdk.Position("foo"),
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Metadata: map[string]string{sdk.MetadataCollection: tableName},
Operation: sdk.OperationDelete,
Key: sdk.StructuredData{"id": 4},
},
Expand Down
25 changes: 12 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ go 1.22
require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/Masterminds/squirrel v1.5.4
github.com/conduitio/conduit-connector-sdk v0.8.0
github.com/conduitio/conduit-commons v0.1.2-0.20240405195636-cb5e072472b0
github.com/conduitio/conduit-connector-sdk v0.8.1-0.20240408123504-cec49fc57887
github.com/daixiang0/gci v0.13.4
github.com/golangci/golangci-lint v1.57.2
github.com/google/go-cmp v0.6.0
Expand Down Expand Up @@ -53,7 +54,7 @@ require (
github.com/charithe/durationcheck v0.0.10 // indirect
github.com/chavacava/garif v0.1.0 // indirect
github.com/ckaznocha/intrange v0.1.1 // indirect
github.com/conduitio/conduit-connector-protocol v0.5.0 // indirect
github.com/conduitio/conduit-connector-protocol v0.5.1-0.20240408121719-ffe7a46af296 // indirect
github.com/curioswitch/go-reassign v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denis-tingaikin/go-header v0.5.0 // indirect
Expand All @@ -77,7 +78,7 @@ require (
github.com/gobwas/glob v0.2.3 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a // indirect
github.com/golangci/gofmt v0.0.0-20231018234816-f50ced29576e // indirect
github.com/golangci/misspell v0.4.1 // indirect
Expand All @@ -89,8 +90,8 @@ require (
github.com/gostaticanalysis/comment v1.4.2 // indirect
github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect
github.com/gostaticanalysis/nilerr v0.1.1 // indirect
github.com/hashicorp/go-hclog v1.3.1 // indirect
github.com/hashicorp/go-plugin v1.4.5 // indirect
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/go-plugin v1.6.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
Expand All @@ -103,7 +104,6 @@ require (
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jgautheron/goconst v1.7.1 // indirect
github.com/jhump/protoreflect v1.10.2-0.20220118162304-602a8db873e3 // indirect
github.com/jingyugao/rowserrcheck v1.1.1 // indirect
github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect
github.com/jjti/go-spancheck v0.5.3 // indirect
Expand Down Expand Up @@ -155,7 +155,7 @@ require (
github.com/quasilyte/gogrep v0.5.0 // indirect
github.com/quasilyte/regex/syntax v0.0.0-20210819130434-b3f0c404a727 // indirect
github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567 // indirect
github.com/rs/zerolog v1.31.0 // indirect
github.com/rs/zerolog v1.32.0 // indirect
github.com/ryancurrah/gomodguard v1.3.1 // indirect
github.com/ryanrolds/sqlclosecheck v0.5.1 // indirect
github.com/sanposhiho/wastedassign/v2 v2.0.7 // indirect
Expand All @@ -172,7 +172,7 @@ require (
github.com/sourcegraph/go-diff v0.7.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/cobra v1.8.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.12.0 // indirect
Expand All @@ -198,22 +198,21 @@ require (
gitlab.com/bosi/decorder v0.4.1 // indirect
go-simpler.org/musttag v0.9.0 // indirect
go-simpler.org/sloglint v0.5.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect
go.uber.org/goleak v1.3.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc // indirect
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240325203815-454cdb8f5daa // indirect
google.golang.org/grpc v1.63.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
Loading

0 comments on commit 4466f41

Please sign in to comment.