From 47809ad68a209e2460f2832c12162678993d6ae2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 24 Apr 2024 18:07:07 +0200 Subject: [PATCH] remove keys from configuration --- source.go | 10 +--------- source/config.go | 21 ++++----------------- source/config_test.go | 6 +----- source/paramgen.go | 6 ------ 4 files changed, 6 insertions(+), 37 deletions(-) diff --git a/source.go b/source.go index 9d7d3b0..ab95a75 100644 --- a/source.go +++ b/source.go @@ -52,11 +52,7 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error { if err != nil { return err } - s.tableKeys, err = s.config.Validate() - if err != nil { - return err - } - return nil + return s.config.Validate() } func (s *Source) Open(ctx context.Context, pos sdk.Position) error { @@ -81,10 +77,6 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { // ensure we have keys for all tables for _, tableName := range s.config.Tables { - // get unprovided table keys - if _, ok := s.tableKeys[tableName]; ok { - continue // key was provided manually - } s.tableKeys[tableName], err = s.getTableKeys(ctx, tableName) if err != nil { return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", tableName, err) diff --git a/source/config.go b/source/config.go index 7a1e664..6475823 100644 --- a/source/config.go +++ b/source/config.go @@ -18,8 +18,6 @@ package source import ( "fmt" - "strings" - "github.com/jackc/pgx/v5" ) @@ -53,8 +51,6 @@ type Config struct { // Tables is a List of table names to read from, separated by a comma, e.g.:"table1,table2". // Use "*" if you'd like to listen to all tables. Tables []string `json:"table" validate:"required"` - // Key is a list of Key column names per table, e.g.:"table1:key1,table2:key2", records should use the key values for their `Key` fields. - Key []string `json:"key"` // SnapshotMode is whether the plugin will take a snapshot of the entire table before starting cdc mode. SnapshotMode SnapshotMode `json:"snapshotMode" validate:"inclusion=initial|never" default:"initial"` @@ -70,24 +66,15 @@ type Config struct { } // Validate validates the provided config values. -func (c Config) Validate() (map[string]string, error) { +func (c Config) Validate() error { // try parsing the url _, err := pgx.ParseConfig(c.URL) if err != nil { - return nil, fmt.Errorf("invalid url: %w", err) + return fmt.Errorf("invalid url: %w", err) } // TODO: when cdcMode "auto" is implemented, change this check if len(c.Tables) != 1 && c.CDCMode == CDCModeLongPolling { - return nil, fmt.Errorf("multi-tables are only supported for logrepl CDCMode, please provide only one table") - } - tableKeys := make(map[string]string, len(c.Tables)) - for _, pair := range c.Key { - // Split each pair into key and value - parts := strings.Split(pair, ":") - if len(parts) != 2 { - return nil, fmt.Errorf("wrong format for the configuration %q, use comma separated pairs of tables and keys, example: table1:key1,table2:key2", "key") - } - tableKeys[parts[0]] = parts[1] + return fmt.Errorf("multi-tables are only supported for logrepl CDCMode, please provide only one table") } - return tableKeys, nil + return nil } diff --git a/source/config_test.go b/source/config_test.go index 7b2c5d8..e2053db 100644 --- a/source/config_test.go +++ b/source/config_test.go @@ -31,7 +31,6 @@ func TestConfig_Validate(t *testing.T) { cfg: Config{ URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb", Tables: []string{"table1", "table2"}, - Key: []string{"table1:key1"}, CDCMode: CDCModeLogrepl, }, wantErr: false, @@ -40,7 +39,6 @@ func TestConfig_Validate(t *testing.T) { cfg: Config{ URL: "postgresql", Tables: []string{"table1", "table2"}, - Key: []string{"table1:key1"}, CDCMode: CDCModeLogrepl, }, wantErr: true, @@ -49,7 +47,6 @@ func TestConfig_Validate(t *testing.T) { cfg: Config{ URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb", Tables: []string{"table1", "table2"}, - Key: []string{"table1:key1"}, CDCMode: CDCModeLongPolling, }, wantErr: true, @@ -58,7 +55,6 @@ func TestConfig_Validate(t *testing.T) { cfg: Config{ URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb", Tables: []string{"table1", "table2"}, - Key: []string{"key1,key2"}, CDCMode: CDCModeLogrepl, }, wantErr: true, @@ -67,7 +63,7 @@ func TestConfig_Validate(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { is := is.New(t) - _, err := tc.cfg.Validate() + err := tc.cfg.Validate() if tc.wantErr { is.True(err != nil) return diff --git a/source/paramgen.go b/source/paramgen.go index e242752..a4e6ef6 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -17,12 +17,6 @@ func (Config) Parameters() map[string]sdk.Parameter { sdk.ValidationInclusion{List: []string{"auto", "logrepl", "long_polling"}}, }, }, - "key": { - Default: "", - Description: "key is a list of key column names per table, e.g.:\"table1:key1,table2:key2\", records should use the key values for their `key` fields.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, - }, "logrepl.publicationName": { Default: "conduitpub", Description: "logrepl.publicationName determines the publication name in case the connector uses logical replication to listen to changes (see CDCMode).",