Skip to content

Commit

Permalink
add support to table
Browse files Browse the repository at this point in the history
  • Loading branch information
raulb committed Apr 24, 2024
1 parent a02c977 commit ba8bd45
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
4 changes: 4 additions & 0 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error {
if err != nil {
return err
}

// TODO: Remove once `Table` is removed.
s.config = s.config.Init()

return s.config.Validate()
}

Expand Down
32 changes: 28 additions & 4 deletions source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package source

import (
"errors"
"fmt"

"github.com/conduitio/conduit-commons/config"
"github.com/jackc/pgx/v5"
)

Expand Down Expand Up @@ -51,7 +53,9 @@ type Config struct {
URL string `json:"url" validate:"required"`
// Tables is a List of table names to read from, separated by a comma, e.g.:"table1,table2".
// Use "*" if you'd like to listen to all tables.
Tables []string `json:"table" validate:"required"`
Tables []string `json:"tables"` // TODO: make it required once `Table` is removed.
// Deprecated: use `tables` instead.
Table []string `json:"table"`

// SnapshotMode is whether the plugin will take a snapshot of the entire table before starting cdc mode.
SnapshotMode SnapshotMode `json:"snapshotMode" validate:"inclusion=initial|never" default:"initial"`
Expand All @@ -68,14 +72,34 @@ type Config struct {

// Validate validates the provided config values.
func (c Config) Validate() error {
var errs []error

// try parsing the url
_, err := pgx.ParseConfig(c.URL)
if err != nil {
return fmt.Errorf("invalid url: %w", err)
errs = append(errs, fmt.Errorf("invalid url: %w", err))
}

if len(c.Tables) > 0 && len(c.Table) > 0 {
errs = append(errs, fmt.Errorf(`error validating "tables": cannot provide both "table" and "tables", use "tables" only`))
}

if len(c.Tables) == 0 {
errs = append(errs, fmt.Errorf(`error validating "tables": %w`, config.ErrRequiredParameterMissing))
}

// TODO: when cdcMode "auto" is implemented, change this check
if len(c.Tables) != 1 && c.CDCMode == CDCModeLongPolling {
return fmt.Errorf("multi-tables are only supported for logrepl CDCMode, please provide only one table")
errs = append(errs, fmt.Errorf("multi-tables are only supported for logrepl CDCMode, please provide only one table"))
}
return errors.Join(errs...)
}

// Init sets the desired value on Tables while Table is being deprecated.
func (c Config) Init() Config {
if len(c.Table) > 0 && len(c.Tables) == 0 {
c.Tables = c.Table
c.Table = nil
}
return nil
return c
}
12 changes: 8 additions & 4 deletions source/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ba8bd45

Please sign in to comment.