diff --git a/README.md b/README.md index 456fd4c..818a8d4 100644 --- a/README.md +++ b/README.md @@ -3,15 +3,15 @@ # Source The Postgres Source Connector connects to a database with the provided `url` and starts creating records for each change -detected in a table. +detected in the provided tables. -Upon starting, the source takes a snapshot of a given table in the database, then switches into CDC mode. In CDC mode, +Upon starting, the source takes a snapshot of the provided tables in the database, then switches into CDC mode. In CDC mode, the plugin reads from a buffer of CDC events. ## Snapshot Capture -When the connector first starts, snapshot mode is enabled. The connector acquires a read-only lock on the table, and -then reads all rows of the table into Conduit. Once all rows in that initial snapshot are read the connector releases +When the connector first starts, snapshot mode is enabled. The connector acquires a read-only lock on the tables, and +then reads all rows of the tables into Conduit. Once all rows in that initial snapshot are read the connector releases its lock and switches into CDC mode. This behavior is enabled by default, but can be turned off by adding `"snapshotMode":"never"` to the Source @@ -37,8 +37,7 @@ Example configuration for CDC features: ```json { "url": "url", - "key": "key", - "table": "records", + "tables": "records", "cdcMode": "logrepl", "logrepl.publicationName": "meroxademo", "logrepl.slotName": "meroxademo" @@ -47,20 +46,20 @@ Example configuration for CDC features: ## Key Handling -If no `key` field is provided, then the connector will attempt to look up the primary key column of the table. If that -can't be determined it will fail. +The connector will automatically look up the primary key column for the specified tables. If that can't be determined, +the connector will return an error. ## Configuration Options -| name | description | required | default | -|---------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------| -| `url` | Connection string for the Postgres database. | true | | -| `table` | List of table names to read from, separated by comma. Using `*` will read from all public tables. | true | | -| `key` | List of Key column names per table, separated by comma. Example:`"table1:key1,table2:key2"`, if not supplied, the table(s) primary keys will be used as the `'Key'` field for the records. | false | | -| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` | -| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` | -| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` | -| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` | +| name | description | required | default | +|---------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------| +| `url` | Connection string for the Postgres database. | true | | +| `tables` | List of table names to read from, separated by comma. Example: `"employees,offices,payments"`. Using `*` will read from all public tables. | true | | +| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` | +| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` | +| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` | +| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` | +| ~~`table`~~ | List of table names to read from, separated by comma. **Deprecated: use `tables` instead.** | false | | # Destination @@ -77,11 +76,10 @@ If there is no key, the record will be simply appended. ## Configuration Options -| name | description | required | default | -|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------------------------------------------| -| `url` | Connection string for the Postgres database. | true | | +| name | description | required | default | +|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|----------------------------------------------| +| `url` | Connection string for the Postgres database. | true | | | `table` | Table name. It can contain a Go template that will be executed for each record to determine the table. By default, the table is the value of the `opencdc.collection` metadata field. | false | `{{ index .Metadata "opencdc.collection" }}` | -| `key` | Column name used to detect if the target table already contains the record. | false | | # Testing diff --git a/go.mod b/go.mod index c3db375..bfc155a 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,8 @@ go 1.22 require ( github.com/Masterminds/sprig/v3 v3.2.3 github.com/Masterminds/squirrel v1.5.4 - github.com/conduitio/conduit-commons v0.1.2-0.20240405195636-cb5e072472b0 - github.com/conduitio/conduit-connector-sdk v0.8.1-0.20240408123504-cec49fc57887 + github.com/conduitio/conduit-commons v0.2.0 + github.com/conduitio/conduit-connector-sdk v0.9.1 github.com/daixiang0/gci v0.13.4 github.com/golangci/golangci-lint v1.57.2 github.com/google/go-cmp v0.6.0 @@ -54,7 +54,7 @@ require ( github.com/charithe/durationcheck v0.0.10 // indirect github.com/chavacava/garif v0.1.0 // indirect github.com/ckaznocha/intrange v0.1.1 // indirect - github.com/conduitio/conduit-connector-protocol v0.5.1-0.20240408121719-ffe7a46af296 // indirect + github.com/conduitio/conduit-connector-protocol v0.6.0 // indirect github.com/curioswitch/go-reassign v0.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/denis-tingaikin/go-header v0.5.0 // indirect @@ -203,7 +203,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.22.0 // indirect - golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect + golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect @@ -211,8 +211,8 @@ require ( golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240325203815-454cdb8f5daa // indirect - google.golang.org/grpc v1.63.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect + google.golang.org/grpc v1.63.2 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index bf478ce..07e7e7d 100644 --- a/go.sum +++ b/go.sum @@ -126,12 +126,12 @@ github.com/ckaznocha/intrange v0.1.1 h1:gHe4LfqCspWkh8KpJFs20fJz3XRHFBFUV9yI7Itu github.com/ckaznocha/intrange v0.1.1/go.mod h1:RWffCw/vKBwHeOEwWdCikAtY0q4gGt8VhJZEEA5n+RE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/conduitio/conduit-commons v0.1.2-0.20240405195636-cb5e072472b0 h1:LCnxTOn9L/vJCX6TThdJjpLfh5Fr9uGP9/5AkZzdb7w= -github.com/conduitio/conduit-commons v0.1.2-0.20240405195636-cb5e072472b0/go.mod h1:shChx2O5D22aUnw6L5biPiV2Tm0CCL+UwKh758lSEfE= -github.com/conduitio/conduit-connector-protocol v0.5.1-0.20240408121719-ffe7a46af296 h1:Qhl0Icbhh/32MuVfv+YReQMtH6qeKq+tMrxL2XC1U34= -github.com/conduitio/conduit-connector-protocol v0.5.1-0.20240408121719-ffe7a46af296/go.mod h1:C1nWLpGCmK2bTjC5CGPAmjRhgBccx6/aS7XDDHKdZYA= -github.com/conduitio/conduit-connector-sdk v0.8.1-0.20240408123504-cec49fc57887 h1:IZMXstp4NK+SRw/SNg6RsPD4FE2Lzw4nKGI0ijaMvrU= -github.com/conduitio/conduit-connector-sdk v0.8.1-0.20240408123504-cec49fc57887/go.mod h1:U48f9UoQKEaPMX7beOXR6x+uRl4XbG7A2KnH7MqZrLs= +github.com/conduitio/conduit-commons v0.2.0 h1:TMpVGXi0Wski537qLAyQWdGjuGHEhaZxOS5L90pZJSQ= +github.com/conduitio/conduit-commons v0.2.0/go.mod h1:i7Q2jm7FBSi2zj1/4MCsFD1hIKAbvamlNtSQfkhUTiY= +github.com/conduitio/conduit-connector-protocol v0.6.0 h1:2gMOCOpa+c97CHIpZv7Niu3V4o5UgRr6fzj9kzfRV7o= +github.com/conduitio/conduit-connector-protocol v0.6.0/go.mod h1:3mo59xYX9etFoR3n82R7J50La1iWK+Vm63H8z2wo4QM= +github.com/conduitio/conduit-connector-sdk v0.9.1 h1:DiMUn7udnjWvyaDsyeTZFHeYTEIdqUU6dqPunEEE3Kw= +github.com/conduitio/conduit-connector-sdk v0.9.1/go.mod h1:cNoofumgDlsaThkxkNYg7zab4AkmRZt1V711aO7guGU= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/curioswitch/go-reassign v0.2.0 h1:G9UZyOcpk/d7Gd6mqYgd8XYWFMw/znxwGDUstnC9DIo= @@ -280,8 +280,8 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= -github.com/google/pprof v0.0.0-20240327155427-868f304927ed h1:n8QtJTrwsv3P7dNxPaMeNkMcxvUpqocsHLr8iDLGlQI= -github.com/google/pprof v0.0.0-20240327155427-868f304927ed/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= +github.com/google/pprof v0.0.0-20240422182052-72c8669ad3e7 h1:3q13T5NW3mlTJZM6B5UAsf2N5NYFbYWIyI3W8DlvBDU= +github.com/google/pprof v0.0.0-20240422182052-72c8669ad3e7/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -666,8 +666,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw= -golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ= +golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY= +golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI= golang.org/x/exp/typeparams v0.0.0-20220428152302-39d4317da171/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= golang.org/x/exp/typeparams v0.0.0-20230203172020-98cc5a0785f9/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f h1:phY1HzDcf18Aq9A8KkmRtY9WvOFIxN8wgfvy6Zm1DV8= @@ -965,8 +965,8 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240325203815-454cdb8f5daa h1:RBgMaUMP+6soRkik4VoN8ojR2nex2TqZwjSSogic+eo= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240325203815-454cdb8f5daa/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -979,8 +979,8 @@ google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKa google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc v1.63.0 h1:WjKe+dnvABXyPJMD7KDNLxtoGk5tgk+YFWN6cBWjZE8= -google.golang.org/grpc v1.63.0/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= +google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= +google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/source.go b/source.go index 4385c97..71c1718 100644 --- a/source.go +++ b/source.go @@ -35,7 +35,7 @@ type Source struct { iterator source.Iterator config source.Config - connPool *pgxpool.Pool + pool *pgxpool.Pool tableKeys map[string]string } @@ -52,39 +52,34 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error { if err != nil { return err } - s.tableKeys, err = s.config.Validate() - if err != nil { - return err - } - return nil + + s.config = s.config.Init() + + return s.config.Validate() } func (s *Source) Open(ctx context.Context, pos sdk.Position) error { - connPool, err := pgxpool.New(ctx, s.config.URL) + pool, err := pgxpool.New(ctx, s.config.URL) if err != nil { return fmt.Errorf("failed to create a connection pool to database: %w", err) } - s.connPool = connPool + s.pool = pool logger := sdk.Logger(ctx) if s.readingAllTables() { logger.Info().Msg("Detecting all tables...") - s.config.Table, err = s.getAllTables(ctx) + s.config.Tables, err = s.getAllTables(ctx) if err != nil { return fmt.Errorf("failed to connect to get all tables: %w", err) } logger.Info(). - Strs("tables", s.config.Table). - Int("count", len(s.config.Table)). + Strs("tables", s.config.Tables). + Int("count", len(s.config.Tables)). Msg("Successfully detected tables") } // ensure we have keys for all tables - for _, tableName := range s.config.Table { - // get unprovided table keys - if _, ok := s.tableKeys[tableName]; ok { - continue // key was provided manually - } + for _, tableName := range s.config.Tables { s.tableKeys[tableName], err = s.getTableKeys(ctx, tableName) if err != nil { return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", tableName, err) @@ -103,11 +98,11 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { logger.Warn().Msg("Snapshot not supported yet in logical replication mode") } - i, err := logrepl.NewCDCIterator(ctx, s.connPool, logrepl.Config{ + i, err := logrepl.NewCDCIterator(ctx, s.pool, logrepl.Config{ Position: pos, SlotName: s.config.LogreplSlotName, PublicationName: s.config.LogreplPublicationName, - Tables: s.config.Table, + Tables: s.config.Tables, TableKeys: s.tableKeys, }) if err != nil { @@ -122,8 +117,8 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { return sdk.ErrUnimplemented } - snap, err := snapshot.NewIterator(ctx, connPool, snapshot.Config{ - Tables: s.config.Table, + snap, err := snapshot.NewIterator(ctx, pool, snapshot.Config{ + Tables: s.config.Tables, TablesKeys: s.tableKeys, }) if err != nil { @@ -157,9 +152,9 @@ func (s *Source) Teardown(ctx context.Context) error { errs = append(errs, fmt.Errorf("failed to tear down iterator: %w", err)) } } - if s.connPool != nil { + if s.pool != nil { logger.Debug().Msg("Closing connection pool...") - err := csync.RunTimeout(ctx, s.connPool.Close, time.Minute) + err := csync.RunTimeout(ctx, s.pool.Close, time.Minute) if err != nil { errs = append(errs, fmt.Errorf("failed to close DB connection pool: %w", err)) } @@ -168,13 +163,13 @@ func (s *Source) Teardown(ctx context.Context) error { } func (s *Source) readingAllTables() bool { - return len(s.config.Table) == 1 && s.config.Table[0] == source.AllTablesWildcard + return len(s.config.Tables) == 1 && s.config.Tables[0] == source.AllTablesWildcard } func (s *Source) getAllTables(ctx context.Context) ([]string, error) { query := "SELECT tablename FROM pg_tables WHERE schemaname = 'public'" - rows, err := s.connPool.Query(ctx, query) + rows, err := s.pool.Query(ctx, query) if err != nil { return nil, err } @@ -205,7 +200,7 @@ JOIN information_schema.columns AS c ON c.table_schema = tc.constraint_schema WHERE constraint_type = 'PRIMARY KEY' AND tc.table_schema = 'public' AND tc.table_name = $1` - rows, err := s.connPool.Query(ctx, query, tableName) + rows, err := s.pool.Query(ctx, query, tableName) if err != nil { return "", fmt.Errorf("failed to query table keys: %w", err) } diff --git a/source/config.go b/source/config.go index bbfdad9..4986fa3 100644 --- a/source/config.go +++ b/source/config.go @@ -17,9 +17,10 @@ package source import ( + "errors" "fmt" - "strings" + "github.com/conduitio/conduit-commons/config" "github.com/jackc/pgx/v5" ) @@ -50,11 +51,11 @@ const ( type Config struct { // URL is the connection string for the Postgres database. URL string `json:"url" validate:"required"` - // Table is a List of table names to read from, separated by a comma, e.g.:"table1,table2". + // Tables is a List of table names to read from, separated by a comma, e.g.:"table1,table2". // Use "*" if you'd like to listen to all tables. - Table []string `json:"table" validate:"required"` - // Key is a list of Key column names per table, e.g.:"table1:key1,table2:key2", records should use the key values for their `Key` fields. - Key []string `json:"key"` + Tables []string `json:"tables"` // TODO: make it required once `Table` is removed. + // Deprecated: use `tables` instead. + Table []string `json:"table"` // SnapshotMode is whether the plugin will take a snapshot of the entire table before starting cdc mode. SnapshotMode SnapshotMode `json:"snapshotMode" validate:"inclusion=initial|never" default:"initial"` @@ -70,24 +71,35 @@ type Config struct { } // Validate validates the provided config values. -func (c Config) Validate() (map[string]string, error) { +func (c Config) Validate() error { + var errs []error + // try parsing the url _, err := pgx.ParseConfig(c.URL) if err != nil { - return nil, fmt.Errorf("invalid url: %w", err) + errs = append(errs, fmt.Errorf("invalid url: %w", err)) + } + + if len(c.Tables) > 0 && len(c.Table) > 0 { + errs = append(errs, fmt.Errorf(`error validating "tables": cannot provide both "table" and "tables", use "tables" only`)) + } + + if len(c.Tables) == 0 { + errs = append(errs, fmt.Errorf(`error validating "tables": %w`, config.ErrRequiredParameterMissing)) } + // TODO: when cdcMode "auto" is implemented, change this check - if len(c.Table) != 1 && c.CDCMode == CDCModeLongPolling { - return nil, fmt.Errorf("multi-tables are only supported for logrepl CDCMode, please provide only one table") + if len(c.Tables) != 1 && c.CDCMode == CDCModeLongPolling { + errs = append(errs, fmt.Errorf("multi-tables are only supported for logrepl CDCMode, please provide only one table")) } - tableKeys := make(map[string]string, len(c.Table)) - for _, pair := range c.Key { - // Split each pair into key and value - parts := strings.Split(pair, ":") - if len(parts) != 2 { - return nil, fmt.Errorf("wrong format for the configuration %q, use comma separated pairs of tables and keys, example: table1:key1,table2:key2", "key") - } - tableKeys[parts[0]] = parts[1] + return errors.Join(errs...) +} + +// Init sets the desired value on Tables while Table is being deprecated. +func (c Config) Init() Config { + if len(c.Table) > 0 && len(c.Tables) == 0 { + c.Tables = c.Table + c.Table = nil } - return tableKeys, nil + return c } diff --git a/source/config_test.go b/source/config_test.go index 1d371ec..76a2980 100644 --- a/source/config_test.go +++ b/source/config_test.go @@ -30,8 +30,7 @@ func TestConfig_Validate(t *testing.T) { name: "valid config", cfg: Config{ URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb", - Table: []string{"table1", "table2"}, - Key: []string{"table1:key1"}, + Tables: []string{"table1", "table2"}, CDCMode: CDCModeLogrepl, }, wantErr: false, @@ -39,8 +38,7 @@ func TestConfig_Validate(t *testing.T) { name: "invalid postgres url", cfg: Config{ URL: "postgresql", - Table: []string{"table1", "table2"}, - Key: []string{"table1:key1"}, + Tables: []string{"table1", "table2"}, CDCMode: CDCModeLogrepl, }, wantErr: true, @@ -48,26 +46,16 @@ func TestConfig_Validate(t *testing.T) { name: "invalid multiple tables for long polling", cfg: Config{ URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb", - Table: []string{"table1", "table2"}, - Key: []string{"table1:key1"}, + Tables: []string{"table1", "table2"}, CDCMode: CDCModeLongPolling, }, wantErr: true, - }, { - name: "invalid key list format", - cfg: Config{ - URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb", - Table: []string{"table1", "table2"}, - Key: []string{"key1,key2"}, - CDCMode: CDCModeLogrepl, - }, - wantErr: true, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { is := is.New(t) - _, err := tc.cfg.Validate() + err := tc.cfg.Validate() if tc.wantErr { is.True(err != nil) return diff --git a/source/paramgen.go b/source/paramgen.go index e242752..5b6b908 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -17,12 +17,6 @@ func (Config) Parameters() map[string]sdk.Parameter { sdk.ValidationInclusion{List: []string{"auto", "logrepl", "long_polling"}}, }, }, - "key": { - Default: "", - Description: "key is a list of key column names per table, e.g.:\"table1:key1,table2:key2\", records should use the key values for their `key` fields.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, - }, "logrepl.publicationName": { Default: "conduitpub", Description: "logrepl.publicationName determines the publication name in case the connector uses logical replication to listen to changes (see CDCMode).", @@ -45,11 +39,15 @@ func (Config) Parameters() map[string]sdk.Parameter { }, "table": { Default: "", - Description: "table is a List of table names to read from, separated by a comma, e.g.:\"table1,table2\". Use \"*\" if you'd like to listen to all tables.", + Description: "Deprecated: use `tables` instead.", Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationRequired{}, - }, + Validations: []sdk.Validation{}, + }, + "tables": { + Default: "", + Description: "tables is a List of table names to read from, separated by a comma, e.g.:\"table1,table2\". Use \"*\" if you'd like to listen to all tables.", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{}, }, "url": { Default: "",