diff --git a/source.go b/source.go index ab95a75..679b524 100644 --- a/source.go +++ b/source.go @@ -52,6 +52,10 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error { if err != nil { return err } + + // TODO: Remove once `Table` is removed. + s.config = s.config.Init() + return s.config.Validate() } diff --git a/source/config.go b/source/config.go index e8d5d5a..4986fa3 100644 --- a/source/config.go +++ b/source/config.go @@ -17,8 +17,10 @@ package source import ( + "errors" "fmt" + "github.com/conduitio/conduit-commons/config" "github.com/jackc/pgx/v5" ) @@ -51,7 +53,9 @@ type Config struct { URL string `json:"url" validate:"required"` // Tables is a List of table names to read from, separated by a comma, e.g.:"table1,table2". // Use "*" if you'd like to listen to all tables. - Tables []string `json:"table" validate:"required"` + Tables []string `json:"tables"` // TODO: make it required once `Table` is removed. + // Deprecated: use `tables` instead. + Table []string `json:"table"` // SnapshotMode is whether the plugin will take a snapshot of the entire table before starting cdc mode. SnapshotMode SnapshotMode `json:"snapshotMode" validate:"inclusion=initial|never" default:"initial"` @@ -68,14 +72,34 @@ type Config struct { // Validate validates the provided config values. func (c Config) Validate() error { + var errs []error + // try parsing the url _, err := pgx.ParseConfig(c.URL) if err != nil { - return fmt.Errorf("invalid url: %w", err) + errs = append(errs, fmt.Errorf("invalid url: %w", err)) + } + + if len(c.Tables) > 0 && len(c.Table) > 0 { + errs = append(errs, fmt.Errorf(`error validating "tables": cannot provide both "table" and "tables", use "tables" only`)) + } + + if len(c.Tables) == 0 { + errs = append(errs, fmt.Errorf(`error validating "tables": %w`, config.ErrRequiredParameterMissing)) } + // TODO: when cdcMode "auto" is implemented, change this check if len(c.Tables) != 1 && c.CDCMode == CDCModeLongPolling { - return fmt.Errorf("multi-tables are only supported for logrepl CDCMode, please provide only one table") + errs = append(errs, fmt.Errorf("multi-tables are only supported for logrepl CDCMode, please provide only one table")) + } + return errors.Join(errs...) +} + +// Init sets the desired value on Tables while Table is being deprecated. +func (c Config) Init() Config { + if len(c.Table) > 0 && len(c.Tables) == 0 { + c.Tables = c.Table + c.Table = nil } - return nil + return c } diff --git a/source/paramgen.go b/source/paramgen.go index a4e6ef6..5b6b908 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -39,11 +39,15 @@ func (Config) Parameters() map[string]sdk.Parameter { }, "table": { Default: "", - Description: "table is a List of table names to read from, separated by a comma, e.g.:\"table1,table2\". Use \"*\" if you'd like to listen to all tables.", + Description: "Deprecated: use `tables` instead.", Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationRequired{}, - }, + Validations: []sdk.Validation{}, + }, + "tables": { + Default: "", + Description: "tables is a List of table names to read from, separated by a comma, e.g.:\"table1,table2\". Use \"*\" if you'd like to listen to all tables.", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{}, }, "url": { Default: "",