From 4466f41c5565e381e899ed504110cf38b58b1d57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 24 Apr 2024 18:24:44 +0100 Subject: [PATCH] feat(cdc): read from multiple tables (#129) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: read from multiple tables * use sdk constant * feat(longpoll): use opencdc.collection * read from all tables * update paramgen.go * update readme * restore readme * read columns from multiple tables * fix typo * restore * leftovers from #131 * more leftovers * update readme on destination * update * update readme * update comment * update comment * use opencdc constant * start making changes on readme * address after rebase * wip * wip * tie snapshot iterator into longpolling mode * get table key if not supplied manually * ensure snapshot iterator waits for all acks * fix snapshot iterator tests * add test for ensuring iterator waits for acks * fix cdc tests * undo changes on readme * address PR review * use pg_tables --------- Co-authored-by: Lovro Mažgon --- README.md | 41 ++---- destination.go | 6 - destination_integration_test.go | 10 +- go.mod | 25 ++-- go.sum | 76 +++++------ source.go | 140 +++++++++++++++----- source/config.go | 12 +- source/logrepl/cdc.go | 72 ++++------ source/logrepl/cdc_test.go | 16 +-- source/logrepl/handler.go | 2 +- source/logrepl/internal/relationset.go | 4 +- source/logrepl/internal/relationset_test.go | 6 +- source/logrepl/snapshot.go | 2 +- source/logrepl/snapshot_test.go | 4 +- source/longpoll/snapshot.go | 10 +- source/paramgen.go | 4 +- source/snapshot/fetch_worker.go | 4 +- source/snapshot/fetch_worker_test.go | 38 +++--- source/snapshot/iterator.go | 9 +- source/snapshot/iterator_test.go | 48 ++++++- test/docker-compose.yml | 2 +- 21 files changed, 299 insertions(+), 232 deletions(-) diff --git a/README.md b/README.md index 854937c..456fd4c 100644 --- a/README.md +++ b/README.md @@ -52,31 +52,21 @@ can't be determined it will fail. ## Configuration Options -| name | description | required | default | -|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------| -| `url` | Connection string for the Postgres database. | true | | -| `table` | List of table names to read from, separated by comma. Example: `"employees,offices,payments"` | true | | -| `key` | List of Key column names per table, separated by comma. Example:`"table1:key1,table2:key2"`, if not supplied, the table primary key will be used as the `'Key'` field for the records. | false | | -| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` | -| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` | -| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` | -| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` | +| name | description | required | default | +|---------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------| +| `url` | Connection string for the Postgres database. | true | | +| `table` | List of table names to read from, separated by comma. Using `*` will read from all public tables. | true | | +| `key` | List of Key column names per table, separated by comma. Example:`"table1:key1,table2:key2"`, if not supplied, the table(s) primary keys will be used as the `'Key'` field for the records. | false | | +| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` | +| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` | +| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` | +| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` | # Destination The Postgres Destination takes a `record.Record` and parses it into a valid SQL query. The Destination is designed to handle different payloads and keys. Because of this, each record is individually parsed and upserted. -## Table Name - -If a record contains a `postgres.table` property in its metadata it will be inserted in that table, otherwise it will -fall back to use the table configured in the connector. This way the Destination can support multiple tables in the same -connector. - -This is especially important in a pipeline where the source is also a Postgres connector, as the source will include the -`postgres.table` field in the metadata of each record. If you want to reroute the records to a different table, you have -to modify the `postgres.table` field in the record's metadata using a processor. - ## Upsert Behavior If the target table already contains a record with the same key, the Destination will upsert with its current received @@ -87,14 +77,11 @@ If there is no key, the record will be simply appended. ## Configuration Options -| name | description | required | default | -|---------|-----------------------------------------------------------------------------|----------|---------| -| `url` | Connection string for the Postgres database. | true | | -| `table` | The name of the table in Postgres that the connector should write to.* | false | | -| `key` | Column name used to detect if the target table already contains the record. | false | | - -*Note that the `postgres.table` field in the record's metadata will override the `table` property in the destination's -configuration. Please refer to [Table Name](#table-name) for more information. +| name | description | required | default | +|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------------------------------------------| +| `url` | Connection string for the Postgres database. | true | | +| `table` | Table name. It can contain a Go template that will be executed for each record to determine the table. By default, the table is the value of the `opencdc.collection` metadata field. | false | `{{ index .Metadata "opencdc.collection" }}` | +| `key` | Column name used to detect if the target table already contains the record. | false | | # Testing diff --git a/destination.go b/destination.go index 8f79c4d..d0c4f80 100644 --- a/destination.go +++ b/destination.go @@ -26,12 +26,6 @@ import ( "github.com/jackc/pgx/v5" ) -const ( - // TODO same constant is defined in packages longpoll, logrepl and destination - // use same constant everywhere - MetadataOpenCDCCollection = "opencdc.collection" -) - type Destination struct { sdk.UnimplementedDestination diff --git a/destination_integration_test.go b/destination_integration_test.go index 51cda7f..1d1c1c4 100644 --- a/destination_integration_test.go +++ b/destination_integration_test.go @@ -56,7 +56,7 @@ func TestDestination_Write(t *testing.T) { record: sdk.Record{ Position: sdk.Position("foo"), Operation: sdk.OperationSnapshot, - Metadata: map[string]string{MetadataOpenCDCCollection: tableName}, + Metadata: map[string]string{sdk.MetadataCollection: tableName}, Key: sdk.StructuredData{"id": 5000}, Payload: sdk.Change{ After: sdk.StructuredData{ @@ -71,7 +71,7 @@ func TestDestination_Write(t *testing.T) { record: sdk.Record{ Position: sdk.Position("foo"), Operation: sdk.OperationCreate, - Metadata: map[string]string{MetadataOpenCDCCollection: tableName}, + Metadata: map[string]string{sdk.MetadataCollection: tableName}, Key: sdk.StructuredData{"id": 5}, Payload: sdk.Change{ After: sdk.StructuredData{ @@ -86,7 +86,7 @@ func TestDestination_Write(t *testing.T) { record: sdk.Record{ Position: sdk.Position("foo"), Operation: sdk.OperationUpdate, - Metadata: map[string]string{MetadataOpenCDCCollection: tableName}, + Metadata: map[string]string{sdk.MetadataCollection: tableName}, Key: sdk.StructuredData{"id": 6}, Payload: sdk.Change{ After: sdk.StructuredData{ @@ -101,7 +101,7 @@ func TestDestination_Write(t *testing.T) { record: sdk.Record{ Position: sdk.Position("foo"), Operation: sdk.OperationUpdate, - Metadata: map[string]string{MetadataOpenCDCCollection: tableName}, + Metadata: map[string]string{sdk.MetadataCollection: tableName}, Key: sdk.StructuredData{"id": 1}, Payload: sdk.Change{ After: sdk.StructuredData{ @@ -115,7 +115,7 @@ func TestDestination_Write(t *testing.T) { name: "delete", record: sdk.Record{ Position: sdk.Position("foo"), - Metadata: map[string]string{MetadataOpenCDCCollection: tableName}, + Metadata: map[string]string{sdk.MetadataCollection: tableName}, Operation: sdk.OperationDelete, Key: sdk.StructuredData{"id": 4}, }, diff --git a/go.mod b/go.mod index a707a71..c3db375 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,8 @@ go 1.22 require ( github.com/Masterminds/sprig/v3 v3.2.3 github.com/Masterminds/squirrel v1.5.4 - github.com/conduitio/conduit-connector-sdk v0.8.0 + github.com/conduitio/conduit-commons v0.1.2-0.20240405195636-cb5e072472b0 + github.com/conduitio/conduit-connector-sdk v0.8.1-0.20240408123504-cec49fc57887 github.com/daixiang0/gci v0.13.4 github.com/golangci/golangci-lint v1.57.2 github.com/google/go-cmp v0.6.0 @@ -53,7 +54,7 @@ require ( github.com/charithe/durationcheck v0.0.10 // indirect github.com/chavacava/garif v0.1.0 // indirect github.com/ckaznocha/intrange v0.1.1 // indirect - github.com/conduitio/conduit-connector-protocol v0.5.0 // indirect + github.com/conduitio/conduit-connector-protocol v0.5.1-0.20240408121719-ffe7a46af296 // indirect github.com/curioswitch/go-reassign v0.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/denis-tingaikin/go-header v0.5.0 // indirect @@ -77,7 +78,7 @@ require ( github.com/gobwas/glob v0.2.3 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/gofrs/flock v0.8.1 // indirect - github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a // indirect github.com/golangci/gofmt v0.0.0-20231018234816-f50ced29576e // indirect github.com/golangci/misspell v0.4.1 // indirect @@ -89,8 +90,8 @@ require ( github.com/gostaticanalysis/comment v1.4.2 // indirect github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect github.com/gostaticanalysis/nilerr v0.1.1 // indirect - github.com/hashicorp/go-hclog v1.3.1 // indirect - github.com/hashicorp/go-plugin v1.4.5 // indirect + github.com/hashicorp/go-hclog v1.5.0 // indirect + github.com/hashicorp/go-plugin v1.6.0 // indirect github.com/hashicorp/go-version v1.6.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/yamux v0.1.1 // indirect @@ -103,7 +104,6 @@ require ( github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jgautheron/goconst v1.7.1 // indirect - github.com/jhump/protoreflect v1.10.2-0.20220118162304-602a8db873e3 // indirect github.com/jingyugao/rowserrcheck v1.1.1 // indirect github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect github.com/jjti/go-spancheck v0.5.3 // indirect @@ -155,7 +155,7 @@ require ( github.com/quasilyte/gogrep v0.5.0 // indirect github.com/quasilyte/regex/syntax v0.0.0-20210819130434-b3f0c404a727 // indirect github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567 // indirect - github.com/rs/zerolog v1.31.0 // indirect + github.com/rs/zerolog v1.32.0 // indirect github.com/ryancurrah/gomodguard v1.3.1 // indirect github.com/ryanrolds/sqlclosecheck v0.5.1 // indirect github.com/sanposhiho/wastedassign/v2 v2.0.7 // indirect @@ -172,7 +172,7 @@ require ( github.com/sourcegraph/go-diff v0.7.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.5.0 // indirect - github.com/spf13/cobra v1.7.0 // indirect + github.com/spf13/cobra v1.8.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.12.0 // indirect @@ -198,13 +198,12 @@ require ( gitlab.com/bosi/decorder v0.4.1 // indirect go-simpler.org/musttag v0.9.0 // indirect go-simpler.org/sloglint v0.5.0 // indirect - go.uber.org/atomic v1.7.0 // indirect go.uber.org/automaxprocs v1.5.3 // indirect go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.24.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.22.0 // indirect - golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc // indirect + golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect @@ -212,8 +211,8 @@ require ( golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect - google.golang.org/grpc v1.59.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240325203815-454cdb8f5daa // indirect + google.golang.org/grpc v1.63.0 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 9f61bfe..bf478ce 100644 --- a/go.sum +++ b/go.sum @@ -86,8 +86,6 @@ github.com/ashanbrown/forbidigo v1.6.0 h1:D3aewfM37Yb3pxHujIPSpTf6oQk9sc9WZi8ger github.com/ashanbrown/forbidigo v1.6.0/go.mod h1:Y8j9jy9ZYAEHXdu723cUlraTqbzjKF1MUyfOKL+AjcU= github.com/ashanbrown/makezero v1.1.1 h1:iCQ87C0V0vSyO+M9E/FZYbu65auqH0lnsOkf5FcB28s= github.com/ashanbrown/makezero v1.1.1/go.mod h1:i1bJLCRSCHOcOa9Y6MyF2FTfMZMFdHvxKHxgO5Z1axI= -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= -github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -102,6 +100,8 @@ github.com/breml/bidichk v0.2.7 h1:dAkKQPLl/Qrk7hnP6P+E0xOodrq8Us7+U0o4UBOAlQY= github.com/breml/bidichk v0.2.7/go.mod h1:YodjipAGI9fGcYM7II6wFvGhdMYsC5pHDlGzqvEW3tQ= github.com/breml/errchkjson v0.3.6 h1:VLhVkqSBH96AvXEyclMR37rZslRrY2kcyq+31HCsVrA= github.com/breml/errchkjson v0.3.6/go.mod h1:jhSDoFheAF2RSDOlCfhHO9KqhZgAYLyvHe7bRCX8f/U= +github.com/bufbuild/protocompile v0.9.0 h1:DI8qLG5PEO0Mu1Oj51YFPqtx6I3qYXUAhJVJ/IzAVl0= +github.com/bufbuild/protocompile v0.9.0/go.mod h1:s89m1O8CqSYpyE/YaSGtg1r1YFMF5nLTwh4vlj6O444= github.com/butuzov/ireturn v0.3.0 h1:hTjMqWw3y5JC3kpnC5vXmFJAWI/m31jaCYQqzkS6PL0= github.com/butuzov/ireturn v0.3.0/go.mod h1:A09nIiwiqzN/IoVo9ogpa0Hzi9fex1kd9PSD6edP5ZA= github.com/butuzov/mirror v1.1.0 h1:ZqX54gBVMXu78QLoiqdwpl2mgmoOJTk7s4p4o+0avZI= @@ -126,12 +126,14 @@ github.com/ckaznocha/intrange v0.1.1 h1:gHe4LfqCspWkh8KpJFs20fJz3XRHFBFUV9yI7Itu github.com/ckaznocha/intrange v0.1.1/go.mod h1:RWffCw/vKBwHeOEwWdCikAtY0q4gGt8VhJZEEA5n+RE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/conduitio/conduit-connector-protocol v0.5.0 h1:Rr2SsDAvWDryQArvonwPoXBELQA2wRXr49xBLrAtBaM= -github.com/conduitio/conduit-connector-protocol v0.5.0/go.mod h1:UIhHWxq52hvwwbkvQDaRgZRHfbpDDmU7tZaw0mwLdd4= -github.com/conduitio/conduit-connector-sdk v0.8.0 h1:gvchqoj5d3AQsBoIosx4i32L8Ex9+5BuAyHi/IM9VD4= -github.com/conduitio/conduit-connector-sdk v0.8.0/go.mod h1:nOz4K3X6fD8YMe5CPbULwSEE18Eu02ZrpT6o6KwQfxs= +github.com/conduitio/conduit-commons v0.1.2-0.20240405195636-cb5e072472b0 h1:LCnxTOn9L/vJCX6TThdJjpLfh5Fr9uGP9/5AkZzdb7w= +github.com/conduitio/conduit-commons v0.1.2-0.20240405195636-cb5e072472b0/go.mod h1:shChx2O5D22aUnw6L5biPiV2Tm0CCL+UwKh758lSEfE= +github.com/conduitio/conduit-connector-protocol v0.5.1-0.20240408121719-ffe7a46af296 h1:Qhl0Icbhh/32MuVfv+YReQMtH6qeKq+tMrxL2XC1U34= +github.com/conduitio/conduit-connector-protocol v0.5.1-0.20240408121719-ffe7a46af296/go.mod h1:C1nWLpGCmK2bTjC5CGPAmjRhgBccx6/aS7XDDHKdZYA= +github.com/conduitio/conduit-connector-sdk v0.8.1-0.20240408123504-cec49fc57887 h1:IZMXstp4NK+SRw/SNg6RsPD4FE2Lzw4nKGI0ijaMvrU= +github.com/conduitio/conduit-connector-sdk v0.8.1-0.20240408123504-cec49fc57887/go.mod h1:U48f9UoQKEaPMX7beOXR6x+uRl4XbG7A2KnH7MqZrLs= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/curioswitch/go-reassign v0.2.0 h1:G9UZyOcpk/d7Gd6mqYgd8XYWFMw/znxwGDUstnC9DIo= github.com/curioswitch/go-reassign v0.2.0/go.mod h1:x6OpXuWvgfQaMGks2BZybTngWjT84hqJfKoO8Tt/Roc= github.com/daixiang0/gci v0.13.4 h1:61UGkmpoAcxHM2hhNkZEf5SzwQtWJXTSws7jaPyqwlw= @@ -173,8 +175,8 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= -github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= -github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= @@ -220,8 +222,6 @@ github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= -github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= -github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -238,8 +238,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a h1:w8hkcTqaFpzKqonE9uMCefW1WDie15eSP/4MssdenaM= github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a/go.mod h1:ryS0uhF+x9jgbj/N71xsEqODy9BN81/GonCZiOzirOk= github.com/golangci/gofmt v0.0.0-20231018234816-f50ced29576e h1:ULcKCDV1LOZPFxGZaA6TlQbiM3J2GCPnkx/bGF6sX/g= @@ -280,15 +280,14 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20240327155427-868f304927ed h1:n8QtJTrwsv3P7dNxPaMeNkMcxvUpqocsHLr8iDLGlQI= +github.com/google/pprof v0.0.0-20240327155427-868f304927ed/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= -github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf/go.mod h1:cuNKsD1zp2v6XfE/orVX2QE1LC+i254ceGcVeDT3pTU= github.com/gordonklaus/ineffassign v0.1.0 h1:y2Gd/9I7MdY1oEIt+n+rowjBNDcLQq3RsH5hwJd0f9s= github.com/gordonklaus/ineffassign v0.1.0/go.mod h1:Qcp2HIAYhR7mNUVSIxZww3Guk4it82ghYcEXIAk+QT0= github.com/gostaticanalysis/analysisutil v0.7.1 h1:ZMCjoue3DtDWQ5WyU16YbjbQEQ3VuzwxALrpYd+HeKk= @@ -303,10 +302,10 @@ github.com/gostaticanalysis/nilerr v0.1.1/go.mod h1:wZYb6YI5YAxxq0i1+VJbY0s2YONW github.com/gostaticanalysis/testutil v0.3.1-0.20210208050101-bfb5c8eec0e4/go.mod h1:D+FIZ+7OahH3ePw/izIEeH5I06eKs1IKI4Xr64/Am3M= github.com/gostaticanalysis/testutil v0.4.0 h1:nhdCmubdmDF6VEatUNjgUZBJKWRqugoISdUv3PPQgHY= github.com/gostaticanalysis/testutil v0.4.0/go.mod h1:bLIoPefWXrRi/ssLFWX1dx7Repi5x3CuviD3dgAZaBU= -github.com/hashicorp/go-hclog v1.3.1 h1:vDwF1DFNZhntP4DAjuTpOw3uEgMUpXh1pB5fW9DqHpo= -github.com/hashicorp/go-hclog v1.3.1/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= -github.com/hashicorp/go-plugin v1.4.5 h1:oTE/oQR4eghggRg8VY7PAz3dr++VwDNBGCcOfIvHpBo= -github.com/hashicorp/go-plugin v1.4.5/go.mod h1:viDMjcLJuDui6pXb8U4HVfb8AamCWhHGUjr2IrTF67s= +github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c= +github.com/hashicorp/go-hclog v1.5.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-plugin v1.6.0 h1:wgd4KxHJTVGGqWBq4QPB1i5BZNEx9BR8+OFmHDmTk8A= +github.com/hashicorp/go-plugin v1.6.0/go.mod h1:lBS5MtSSBZk0SHc66KACcjjlU6WzEVP/8pwz68aMkCI= github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= @@ -339,8 +338,8 @@ github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jgautheron/goconst v1.7.1 h1:VpdAG7Ca7yvvJk5n8dMwQhfEZJh95kl/Hl9S1OI5Jkk= github.com/jgautheron/goconst v1.7.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4= -github.com/jhump/protoreflect v1.10.2-0.20220118162304-602a8db873e3 h1:crcHucf8SsCh17stXXZBx5HW/Q0kfaWVUIdzimNejk8= -github.com/jhump/protoreflect v1.10.2-0.20220118162304-602a8db873e3/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg= +github.com/jhump/protoreflect v1.15.6 h1:WMYJbw2Wo+KOWwZFvgY0jMoVHM6i4XIvRs2RcBj5VmI= +github.com/jhump/protoreflect v1.15.6/go.mod h1:jCHoyYQIJnaabEYnbGwyo9hUqfyUMTbJw/tAut5t97E= github.com/jingyugao/rowserrcheck v1.1.1 h1:zibz55j/MJtLsjP1OF4bSdgXxwL1b+Vn7Tjzq7gFzUs= github.com/jingyugao/rowserrcheck v1.1.1/go.mod h1:4yvlZSDb3IyDTUZJUmpZfm2Hwok+Dtp+nu2qOq+er9c= github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af h1:KA9BjwUk7KlCh6S9EAGWBt1oExIUv9WyNCiRz5amv48= @@ -446,7 +445,6 @@ github.com/nakabonne/nestif v0.3.1 h1:wm28nZjhQY5HyYPx+weN3Q65k6ilSBxDb8v5S81B81 github.com/nakabonne/nestif v0.3.1/go.mod h1:9EtoZochLn5iUprVDmDjqGKPofoUEBL8U4Ngq6aY7OE= github.com/nishanths/exhaustive v0.12.0 h1:vIY9sALmw6T/yxiASewa4TQcFsVYZQQRUQJhKRf3Swg= github.com/nishanths/exhaustive v0.12.0/go.mod h1:mEZ95wPIZW+x8kC4TgC+9YCUgiST7ecevsVDTgc2obs= -github.com/nishanths/predeclared v0.0.0-20200524104333-86fad755b4d3/go.mod h1:nt3d53pc1VYcphSCIaYAJtnPYnr3Zyn8fMq2wvPGPso= github.com/nishanths/predeclared v0.2.2 h1:V2EPdZPliZymNAn79T8RkNApBjMmVKh5XRpLm/w98Vk= github.com/nishanths/predeclared v0.2.2/go.mod h1:RROzoN6TnGQupbC+lqggsOlcgysk3LMK/HI84Mp280c= github.com/nunnatsa/ginkgolinter v0.16.2 h1:8iLqHIZvN4fTLDC0Ke9tbSZVcyVHoBs0HIbnVSxfHJk= @@ -472,7 +470,6 @@ github.com/pelletier/go-toml/v2 v2.2.0 h1:QLgLl2yMN7N+ruc31VynXs1vhMZa7CeHHejIeB github.com/pelletier/go-toml/v2 v2.2.0/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -514,8 +511,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A= -github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0= +github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryancurrah/gomodguard v1.3.1 h1:fH+fUg+ngsQO0ruZXXHnA/2aNllWA1whly4a6UvyzGE= github.com/ryancurrah/gomodguard v1.3.1/go.mod h1:DGFHzEhi6iJ0oIDfMuo3TgrS+L9gZvrEfmjjuelnRU0= @@ -555,8 +552,8 @@ github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNo github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU= -github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= -github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= +github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= +github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -638,16 +635,16 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= -go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= -go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= -go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -669,8 +666,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc h1:ao2WRsKSzW6KuUY9IWPwWahcHCgR0s52IfwutMfEbdM= -golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= +golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw= +golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ= golang.org/x/exp/typeparams v0.0.0-20220428152302-39d4317da171/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= golang.org/x/exp/typeparams v0.0.0-20230203172020-98cc5a0785f9/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f h1:phY1HzDcf18Aq9A8KkmRtY9WvOFIxN8wgfvy6Zm1DV8= @@ -891,9 +888,7 @@ golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4/go.mod h1:Sl4aGygMT6LrqrWc golang.org/x/tools v0.0.0-20200501065659-ab2804fb9c9d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= -golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= -golang.org/x/tools v0.0.0-20200717024301-6ddee64345a6/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200724022722-7017fd6b1305/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= @@ -970,8 +965,8 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f h1:ultW7fxlIvee4HYrtnaRPon9HpEgFk5zYpmfMgtKB5I= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f/go.mod h1:L9KNLi232K1/xB6f7AlSX692koaRnKaWSR0stBki0Yc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240325203815-454cdb8f5daa h1:RBgMaUMP+6soRkik4VoN8ojR2nex2TqZwjSSogic+eo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240325203815-454cdb8f5daa/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -984,8 +979,8 @@ google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKa google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= -google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/grpc v1.63.0 h1:WjKe+dnvABXyPJMD7KDNLxtoGk5tgk+YFWN6cBWjZE8= +google.golang.org/grpc v1.63.0/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -996,7 +991,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.25.1-0.20200805231151-a709e31e5d12/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= diff --git a/source.go b/source.go index 51893de..4385c97 100644 --- a/source.go +++ b/source.go @@ -16,13 +16,17 @@ package postgres import ( "context" + "errors" "fmt" + "time" + "github.com/conduitio/conduit-commons/csync" "github.com/conduitio/conduit-connector-postgres/source" "github.com/conduitio/conduit-connector-postgres/source/logrepl" - "github.com/conduitio/conduit-connector-postgres/source/longpoll" + "github.com/conduitio/conduit-connector-postgres/source/snapshot" sdk "github.com/conduitio/conduit-connector-sdk" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" ) // Source is a Postgres source plugin. @@ -31,7 +35,7 @@ type Source struct { iterator source.Iterator config source.Config - conn *pgx.Conn + connPool *pgxpool.Pool tableKeys map[string]string } @@ -56,15 +60,36 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error { } func (s *Source) Open(ctx context.Context, pos sdk.Position) error { - conn, err := pgx.Connect(ctx, s.config.URL) + connPool, err := pgxpool.New(ctx, s.config.URL) if err != nil { - return fmt.Errorf("failed to connect to database: %w", err) + return fmt.Errorf("failed to create a connection pool to database: %w", err) } - columns, err := s.getTableColumns(ctx, conn) - if err != nil { - return fmt.Errorf("failed to connect to database: %w", err) + s.connPool = connPool + + logger := sdk.Logger(ctx) + if s.readingAllTables() { + logger.Info().Msg("Detecting all tables...") + s.config.Table, err = s.getAllTables(ctx) + if err != nil { + return fmt.Errorf("failed to connect to get all tables: %w", err) + } + logger.Info(). + Strs("tables", s.config.Table). + Int("count", len(s.config.Table)). + Msg("Successfully detected tables") + } + + // ensure we have keys for all tables + for _, tableName := range s.config.Table { + // get unprovided table keys + if _, ok := s.tableKeys[tableName]; ok { + continue // key was provided manually + } + s.tableKeys[tableName], err = s.getTableKeys(ctx, tableName) + if err != nil { + return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", tableName, err) + } } - s.conn = conn switch s.config.CDCMode { case source.CDCModeAuto: @@ -75,10 +100,10 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { if s.config.SnapshotMode == source.SnapshotModeInitial { // TODO create snapshot iterator for logical replication and pass // the snapshot mode in the config - sdk.Logger(ctx).Warn().Msg("snapshot not supported in logical replication mode") + logger.Warn().Msg("Snapshot not supported yet in logical replication mode") } - i, err := logrepl.NewCDCIterator(ctx, s.conn, logrepl.Config{ + i, err := logrepl.NewCDCIterator(ctx, s.connPool, logrepl.Config{ Position: pos, SlotName: s.config.LogreplSlotName, PublicationName: s.config.LogreplPublicationName, @@ -90,22 +115,21 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { } s.iterator = i case source.CDCModeLongPolling: - sdk.Logger(ctx).Warn().Msg("long polling not supported yet, only snapshot is supported") + logger.Warn().Msg("Long polling not supported yet, only snapshot is supported") if s.config.SnapshotMode != source.SnapshotModeInitial { // TODO create long polling iterator and pass snapshot mode in the config - sdk.Logger(ctx).Warn().Msg("snapshot disabled, can't do anything right now") + logger.Warn().Msg("snapshot disabled, can't do anything right now") return sdk.ErrUnimplemented } - snap, err := longpoll.NewSnapshotIterator( - ctx, - s.conn, - s.config.Table[0], // todo: only the first table for now - columns, - s.tableKeys[s.config.Table[0]]) + snap, err := snapshot.NewIterator(ctx, connPool, snapshot.Config{ + Tables: s.config.Table, + TablesKeys: s.tableKeys, + }) if err != nil { return fmt.Errorf("failed to create long polling iterator: %w", err) } + s.iterator = snap default: // shouldn't happen, config was validated @@ -123,39 +147,87 @@ func (s *Source) Ack(ctx context.Context, pos sdk.Position) error { } func (s *Source) Teardown(ctx context.Context) error { + logger := sdk.Logger(ctx) + + var errs []error if s.iterator != nil { + logger.Debug().Msg("Tearing down iterator...") if err := s.iterator.Teardown(ctx); err != nil { - return fmt.Errorf("failed to tear down iterator: %w", err) + logger.Warn().Err(err).Msg("Failed to tear down iterator") + errs = append(errs, fmt.Errorf("failed to tear down iterator: %w", err)) } } - if s.conn != nil { - if err := s.conn.Close(ctx); err != nil { - return fmt.Errorf("failed to close DB connection: %w", err) + if s.connPool != nil { + logger.Debug().Msg("Closing connection pool...") + err := csync.RunTimeout(ctx, s.connPool.Close, time.Minute) + if err != nil { + errs = append(errs, fmt.Errorf("failed to close DB connection pool: %w", err)) } } - return nil + return errors.Join(errs...) +} + +func (s *Source) readingAllTables() bool { + return len(s.config.Table) == 1 && s.config.Table[0] == source.AllTablesWildcard } -func (s *Source) getTableColumns(ctx context.Context, conn *pgx.Conn) ([]string, error) { - query := "SELECT column_name FROM information_schema.columns WHERE table_name = $1" +func (s *Source) getAllTables(ctx context.Context) ([]string, error) { + query := "SELECT tablename FROM pg_tables WHERE schemaname = 'public'" - rows, err := conn.Query(ctx, query, s.config.Table[0]) + rows, err := s.connPool.Query(ctx, query) if err != nil { return nil, err } defer rows.Close() - var columns []string + var tables []string for rows.Next() { - var columnName string - err := rows.Scan(&columnName) - if err != nil { - return nil, err + var tableName string + if err := rows.Scan(&tableName); err != nil { + return nil, fmt.Errorf("failed to scan table name: %w", err) } - columns = append(columns, columnName) + tables = append(tables, tableName) } - if err = rows.Err(); err != nil { + if err := rows.Err(); err != nil { return nil, fmt.Errorf("rows error: %w", err) } - return columns, nil + return tables, nil +} + +// getTableKeys queries the db for the name of the primary key column for a +// table if one exists and returns it. +func (s *Source) getTableKeys(ctx context.Context, tableName string) (string, error) { + query := `SELECT c.column_name +FROM information_schema.table_constraints tc +JOIN information_schema.constraint_column_usage AS ccu USING (constraint_schema, constraint_name) +JOIN information_schema.columns AS c ON c.table_schema = tc.constraint_schema + AND tc.table_name = c.table_name AND ccu.column_name = c.column_name +WHERE constraint_type = 'PRIMARY KEY' AND tc.table_schema = 'public' + AND tc.table_name = $1` + + rows, err := s.connPool.Query(ctx, query, tableName) + if err != nil { + return "", fmt.Errorf("failed to query table keys: %w", err) + } + defer rows.Close() + + if !rows.Next() { + if rows.Err() != nil { + return "", fmt.Errorf("query failed: %w", rows.Err()) + } + return "", fmt.Errorf("no table keys found: %w", pgx.ErrNoRows) + } + + var colName string + err = rows.Scan(&colName) + if err != nil { + return "", fmt.Errorf("failed to scan row: %w", err) + } + + if rows.Next() { + // we only support single column primary keys for now + return "", errors.New("composite keys are not supported") + } + + return colName, nil } diff --git a/source/config.go b/source/config.go index 3f51110..bbfdad9 100644 --- a/source/config.go +++ b/source/config.go @@ -42,14 +42,18 @@ const ( CDCModeLogrepl CDCMode = "logrepl" // CDCModeLongPolling uses long polling to listen to changes. CDCModeLongPolling CDCMode = "long_polling" + + // AllTablesWildcard can be used if you'd like to listen to all tables. + AllTablesWildcard = "*" ) type Config struct { // URL is the connection string for the Postgres database. URL string `json:"url" validate:"required"` - // Table is a List of table names to read from, separated by a comma. + // Table is a List of table names to read from, separated by a comma, e.g.:"table1,table2". + // Use "*" if you'd like to listen to all tables. Table []string `json:"table" validate:"required"` - // Key is a list of Key column names per table, ex:"table1:key1,table2:key2", records should use the key values for their `Key` fields. + // Key is a list of Key column names per table, e.g.:"table1:key1,table2:key2", records should use the key values for their `Key` fields. Key []string `json:"key"` // SnapshotMode is whether the plugin will take a snapshot of the entire table before starting cdc mode. @@ -72,9 +76,9 @@ func (c Config) Validate() (map[string]string, error) { if err != nil { return nil, fmt.Errorf("invalid url: %w", err) } - // todo: when cdcMode "auto" is implemented, change this check + // TODO: when cdcMode "auto" is implemented, change this check if len(c.Table) != 1 && c.CDCMode == CDCModeLongPolling { - return nil, fmt.Errorf("multi tables are only supported for logrepl CDCMode, please provide only one table") + return nil, fmt.Errorf("multi-tables are only supported for logrepl CDCMode, please provide only one table") } tableKeys := make(map[string]string, len(c.Table)) for _, pair := range c.Key { diff --git a/source/logrepl/cdc.go b/source/logrepl/cdc.go index 30add9e..94a4c2f 100644 --- a/source/logrepl/cdc.go +++ b/source/logrepl/cdc.go @@ -22,13 +22,8 @@ import ( "github.com/conduitio/conduit-connector-postgres/source/logrepl/internal" sdk "github.com/conduitio/conduit-connector-sdk" "github.com/jackc/pglogrepl" - "github.com/jackc/pgx/v5" -) - -const ( - // TODO same constant is defined in packages longpoll, logrepl and destination - // use same constant everywhere - MetadataPostgresTable = "postgres.table" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" ) // Config holds configuration values for CDCIterator. @@ -40,6 +35,17 @@ type Config struct { TableKeys map[string]string } +func (c Config) LSN() (pglogrepl.LSN, error) { + if len(c.Position) == 0 { + return 0, nil + } + lsn, err := PositionToLSN(c.Position) + if err != nil { + return 0, fmt.Errorf("failed to parse position: %w", err) + } + return lsn, nil +} + // CDCIterator asynchronously listens for events from the logical replication // slot and returns them to the caller through Next. type CDCIterator struct { @@ -52,13 +58,13 @@ type CDCIterator struct { // NewCDCIterator sets up the subscription to a logical replication slot and // starts a goroutine that listens to events. The goroutine will keep running // until either the context is canceled or Teardown is called. -func NewCDCIterator(ctx context.Context, conn *pgx.Conn, config Config) (*CDCIterator, error) { +func NewCDCIterator(ctx context.Context, connPool *pgxpool.Pool, config Config) (*CDCIterator, error) { i := &CDCIterator{ config: config, records: make(chan sdk.Record), } - err := i.attachSubscription(ctx, conn) + err := i.attachSubscription(connPool.Config().ConnConfig.Config) if err != nil { return nil, fmt.Errorf("failed to setup subscription: %w", err) } @@ -143,39 +149,27 @@ func (i *CDCIterator) Teardown(ctx context.Context) error { // attachSubscription determines the starting LSN and key column of the source // table and prepares a subscription. -func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) error { - var lsn pglogrepl.LSN - if i.config.Position != nil && string(i.config.Position) != "" { - var err error - lsn, err = PositionToLSN(i.config.Position) - if err != nil { - return err - } +func (i *CDCIterator) attachSubscription(connConfig pgconn.Config) error { + lsn, err := i.config.LSN() + if err != nil { + return err } - var err error - if i.config.TableKeys == nil { - i.config.TableKeys = make(map[string]string, len(i.config.Tables)) - } + // make sure we have all table keys for _, tableName := range i.config.Tables { - // get unprovided table keys - if _, ok := i.config.TableKeys[tableName]; ok { - continue // key was provided manually - } - i.config.TableKeys[tableName], err = i.getTableKeys(ctx, conn, tableName) - if err != nil { - return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", tableName, err) + if i.config.TableKeys[tableName] == "" { + return fmt.Errorf("missing key for table %q", tableName) } } sub := internal.NewSubscription( - conn.Config().Config, + connConfig, i.config.SlotName, i.config.PublicationName, i.config.Tables, lsn, NewCDCHandler( - internal.NewRelationSet(conn.TypeMap()), + internal.NewRelationSet(), i.config.TableKeys, i.records, ).Handle, @@ -184,21 +178,3 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er i.sub = sub return nil } - -// getTableKeys queries the db for the name of the primary key column for a -// table if one exists and returns it. -func (i *CDCIterator) getTableKeys(ctx context.Context, conn *pgx.Conn, tableName string) (string, error) { - query := `SELECT column_name - FROM information_schema.key_column_usage - WHERE table_name = $1 AND constraint_name LIKE '%_pkey' - LIMIT 1;` - row := conn.QueryRow(ctx, query, tableName) - - var colName string - err := row.Scan(&colName) - if err != nil { - return "", fmt.Errorf("getTableKeys query failed: %w", err) - } - - return colName, nil -} diff --git a/source/logrepl/cdc_test.go b/source/logrepl/cdc_test.go index 47a1466..a377279 100644 --- a/source/logrepl/cdc_test.go +++ b/source/logrepl/cdc_test.go @@ -54,7 +54,7 @@ func TestIterator_Next(t *testing.T) { want: sdk.Record{ Operation: sdk.OperationCreate, Metadata: map[string]string{ - MetadataPostgresTable: table, + sdk.MetadataCollection: table, }, Key: sdk.StructuredData{"id": int64(6)}, Payload: sdk.Change{ @@ -78,7 +78,7 @@ func TestIterator_Next(t *testing.T) { want: sdk.Record{ Operation: sdk.OperationUpdate, Metadata: map[string]string{ - MetadataPostgresTable: table, + sdk.MetadataCollection: table, }, Key: sdk.StructuredData{"id": int64(1)}, Payload: sdk.Change{ @@ -100,7 +100,7 @@ func TestIterator_Next(t *testing.T) { want: sdk.Record{ Operation: sdk.OperationDelete, Metadata: map[string]string{ - MetadataPostgresTable: table, + sdk.MetadataCollection: table, }, Key: sdk.StructuredData{"id": int64(3)}, }, @@ -139,18 +139,12 @@ func testIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, table s is := is.New(t) config := Config{ Tables: []string{table}, + TableKeys: map[string]string{table: "id"}, PublicationName: table, // table is random, reuse for publication name SlotName: table, // table is random, reuse for slot name } - // acquire connection for the time of the test - conn, err := pool.Acquire(ctx) - is.NoErr(err) - t.Cleanup(func() { - conn.Release() - }) - - i, err := NewCDCIterator(ctx, conn.Conn(), config) + i, err := NewCDCIterator(ctx, pool, config) is.NoErr(err) return i } diff --git a/source/logrepl/handler.go b/source/logrepl/handler.go index c6d1981..e6fa3b5 100644 --- a/source/logrepl/handler.go +++ b/source/logrepl/handler.go @@ -173,7 +173,7 @@ func (h *CDCHandler) send(ctx context.Context, rec sdk.Record) error { func (h *CDCHandler) buildRecordMetadata(relation *pglogrepl.RelationMessage) map[string]string { return map[string]string{ - MetadataPostgresTable: relation.RelationName, + sdk.MetadataCollection: relation.RelationName, } } diff --git a/source/logrepl/internal/relationset.go b/source/logrepl/internal/relationset.go index 905693d..70cc971 100644 --- a/source/logrepl/internal/relationset.go +++ b/source/logrepl/internal/relationset.go @@ -30,10 +30,10 @@ type RelationSet struct { } // NewRelationSet creates a new relation set. -func NewRelationSet(ci *pgtype.Map) *RelationSet { +func NewRelationSet() *RelationSet { return &RelationSet{ relations: map[uint32]*pglogrepl.RelationMessage{}, - connInfo: ci, + connInfo: pgtype.NewMap(), } } diff --git a/source/logrepl/internal/relationset_test.go b/source/logrepl/internal/relationset_test.go index 47686b2..8021338 100644 --- a/source/logrepl/internal/relationset_test.go +++ b/source/logrepl/internal/relationset_test.go @@ -31,11 +31,9 @@ import ( ) func TestRelationSetUnregisteredType(t *testing.T) { - ctx := context.Background() is := is.New(t) - conn := test.ConnectSimple(ctx, t, test.RepmgrConnString) - rs := NewRelationSet(conn.TypeMap()) + rs := NewRelationSet() got, err := rs.Get(1234567) is.True(err != nil) @@ -79,7 +77,7 @@ func TestRelationSetAllTypes(t *testing.T) { break } - rs := NewRelationSet(conn.TypeMap()) + rs := NewRelationSet() rs.Add(rel) gotRel, err := rs.Get(rel.RelationID) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index 1b7ebbf..0e58701 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -182,7 +182,7 @@ func (s *SnapshotIterator) buildRecordPosition() sdk.Position { func (s *SnapshotIterator) buildRecordMetadata() map[string]string { return map[string]string{ - MetadataPostgresTable: s.config.Table, + sdk.MetadataCollection: s.config.Table, } } diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index b7d018a..9524b76 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -94,8 +94,8 @@ func TestSnapshotInterrupted(t *testing.T) { }, }, Metadata: map[string]string{ - MetadataPostgresTable: table, - sdk.MetadataReadAt: rec.Metadata[sdk.MetadataReadAt], + sdk.MetadataCollection: table, + sdk.MetadataReadAt: rec.Metadata[sdk.MetadataReadAt], }, }) is.True(errors.Is(s.Teardown(ctx), ErrSnapshotInterrupt)) diff --git a/source/longpoll/snapshot.go b/source/longpoll/snapshot.go index 4060c78..88282b9 100644 --- a/source/longpoll/snapshot.go +++ b/source/longpoll/snapshot.go @@ -24,12 +24,6 @@ import ( "github.com/jackc/pgx/v5" ) -const ( - // TODO same constant is defined in packages longpoll, logrepl and destination - // use same constant everywhere - MetadataPostgresTable = "postgres.table" -) - // Declare Postgres $ placeholder format var psql = sq.StatementBuilder.PlaceholderFormat(sq.Dollar) @@ -57,7 +51,7 @@ type SnapshotIterator struct { // rows holds a reference to the postgres connection. this can be nil so // we must always call loadRows before HasNext or Next. rows pgx.Rows - // ineternalPos is an internal integer Position for the SnapshotIterator to + // internalPos is an internal integer Position for the SnapshotIterator to // to return at each Read call. internalPos int64 // snapshotComplete keeps an internal record of whether the snapshot is @@ -182,7 +176,7 @@ func (s *SnapshotIterator) buildRecordPosition() sdk.Position { func (s *SnapshotIterator) buildRecordMetadata() map[string]string { return map[string]string{ - MetadataPostgresTable: s.table, + sdk.MetadataCollection: s.table, } } diff --git a/source/paramgen.go b/source/paramgen.go index e40fa8b..e242752 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -19,7 +19,7 @@ func (Config) Parameters() map[string]sdk.Parameter { }, "key": { Default: "", - Description: "key is a list of key column names per table, ex:\"table1:key1,table2:key2\", records should use the key values for their `key` fields.", + Description: "key is a list of key column names per table, e.g.:\"table1:key1,table2:key2\", records should use the key values for their `key` fields.", Type: sdk.ParameterTypeString, Validations: []sdk.Validation{}, }, @@ -45,7 +45,7 @@ func (Config) Parameters() map[string]sdk.Parameter { }, "table": { Default: "", - Description: "table is a List of table names to read from, separated by a comma.", + Description: "table is a List of table names to read from, separated by a comma, e.g.:\"table1,table2\". Use \"*\" if you'd like to listen to all tables.", Type: sdk.ParameterTypeString, Validations: []sdk.Validation{ sdk.ValidationRequired{}, diff --git a/source/snapshot/fetch_worker.go b/source/snapshot/fetch_worker.go index 7b0b93b..0b3f7aa 100644 --- a/source/snapshot/fetch_worker.go +++ b/source/snapshot/fetch_worker.go @@ -392,7 +392,9 @@ func (*FetchWorker) validateKey(ctx context.Context, table, key string, tx pgx.T } if !isPK { - return fmt.Errorf("invalid key %q, not a primary key", key) + sdk.Logger(ctx).Warn(). + Err(fmt.Errorf("column %q is not a primary key", key)). + Msg("this may cause unexpected behavior if the key is not unique") } return nil diff --git a/source/snapshot/fetch_worker_test.go b/source/snapshot/fetch_worker_test.go index 4c2d789..a2b4f9b 100644 --- a/source/snapshot/fetch_worker_test.go +++ b/source/snapshot/fetch_worker_test.go @@ -160,18 +160,9 @@ func Test_FetcherValidate(t *testing.T) { }, } - err1 := f.Validate(ctx) - is.True(err1 != nil) - t.Logf("err: %s\n", err1.Error()) - is.True(strings.Contains(err1.Error(), `failed to validate key: key "column3" of type "boolean" is unsupported`)) - - f.conf.Key = "missing_key" - err2 := f.Validate(ctx) - is.True(err2 != nil) - is.True(strings.Contains( - err2.Error(), - fmt.Sprintf(`key "missing_key" not present on table %q`, table), - )) + err := f.Validate(ctx) + is.True(err != nil) + is.True(strings.Contains(err.Error(), `failed to validate key: key "column3" of type "boolean" is unsupported`)) }) t.Run("key is not pk", func(t *testing.T) { @@ -184,15 +175,24 @@ func Test_FetcherValidate(t *testing.T) { }, } - err1 := f.Validate(ctx) - is.True(err1 != nil) - is.True(strings.Contains(err1.Error(), `failed to validate key: invalid key "column2", not a primary key`)) + err := f.Validate(ctx) + is.NoErr(err) // no error, only a warning + }) - f.conf.Key = "missing_key" - err2 := f.Validate(ctx) - is.True(err2 != nil) + t.Run("missing key", func(t *testing.T) { + is := is.New(t) + f := FetchWorker{ + db: pool, + conf: FetchConfig{ + Table: table, + Key: "missing_key", + }, + } + + err := f.Validate(ctx) + is.True(err != nil) is.True(strings.Contains( - err2.Error(), + err.Error(), fmt.Sprintf(`key "missing_key" not present on table %q`, table), )) }) diff --git a/source/snapshot/iterator.go b/source/snapshot/iterator.go index fd203c9..58de972 100644 --- a/source/snapshot/iterator.go +++ b/source/snapshot/iterator.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" + "github.com/conduitio/conduit-commons/csync" "github.com/conduitio/conduit-connector-postgres/source/position" sdk "github.com/conduitio/conduit-connector-sdk" "github.com/jackc/pgx/v5/pgxpool" @@ -38,6 +39,7 @@ type Iterator struct { db *pgxpool.Pool t *tomb.Tomb workers []*FetchWorker + acks csync.WaitGroup conf Config @@ -83,14 +85,19 @@ func (i *Iterator) Next(ctx context.Context) (sdk.Record, error) { if err := i.t.Err(); err != nil { return sdk.Record{}, fmt.Errorf("fetchers exited unexpectedly: %w", err) } + if err := i.acks.Wait(ctx); err != nil { + return sdk.Record{}, fmt.Errorf("failed to wait for acks: %w", err) + } return sdk.Record{}, ErrIteratorDone } + i.acks.Add(1) return i.buildRecord(d), nil } } -func (i *Iterator) Ack(_ context.Context) error { +func (i *Iterator) Ack(_ context.Context, _ sdk.Position) error { + i.acks.Done() return nil } diff --git a/source/snapshot/iterator_test.go b/source/snapshot/iterator_test.go index 65a0a18..a61d3ab 100644 --- a/source/snapshot/iterator_test.go +++ b/source/snapshot/iterator_test.go @@ -18,6 +18,7 @@ import ( "context" "errors" "testing" + "time" "github.com/conduitio/conduit-connector-postgres/source/position" "github.com/conduitio/conduit-connector-postgres/test" @@ -47,10 +48,55 @@ func Test_Iterator_Next(t *testing.T) { }() for j := 1; j <= 4; j++ { - _, err := i.Next(ctx) + _, err = i.Next(ctx) is.NoErr(err) } + for j := 1; j <= 4; j++ { + err = i.Ack(ctx, nil) + is.NoErr(err) + } + + _, err = i.Next(ctx) + is.Equal(err, ErrIteratorDone) + }) + + t.Run("next waits for acks", func(t *testing.T) { + is := is.New(t) + + i, err := NewIterator(ctx, pool, Config{ + Position: position.Position{}.ToSDKPosition(), + Tables: []string{table}, + TablesKeys: map[string]string{ + table: "id", + }, + }) + is.NoErr(err) + defer func() { + is.NoErr(i.Teardown(ctx)) + }() + + for j := 1; j <= 4; j++ { + _, err = i.Next(ctx) + is.NoErr(err) + } + // Only ack 3 records + for j := 1; j <= 3; j++ { + err = i.Ack(ctx, nil) + is.NoErr(err) + } + + ctxTimeout, cancel := context.WithTimeout(ctx, time.Millisecond*10) + defer cancel() + + // No more records, but Next blocks because we haven't acked all records + _, err = i.Next(ctxTimeout) + is.True(errors.Is(err, context.DeadlineExceeded)) + + // Ack the last record + err = i.Ack(ctx, nil) + is.NoErr(err) + // Now Next won't block _, err = i.Next(ctx) is.Equal(err, ErrIteratorDone) }) diff --git a/test/docker-compose.yml b/test/docker-compose.yml index ebcf63d..043d879 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -1,7 +1,7 @@ version: '3.4' services: pg-0: - image: docker.io/bitnami/postgresql-repmgr:15 + image: docker.io/bitnami/postgresql-repmgr:16 ports: - "5433:5432" volumes: