Skip to content

Commit

Permalink
manual fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Jun 20, 2024
1 parent 946c157 commit 3afaf5f
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 18 deletions.
11 changes: 4 additions & 7 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ func (d *Destination) Parameters() config.Parameters {
return destination.Config{}.Parameters()
}

func (d *Destination) Configure(_ context.Context, cfg config.Config) error {
var config destination.Config

err := sdk.Util.ParseConfig(ctx, cfg, &config)
func (d *Destination) Configure(ctx context.Context, cfg config.Config) error {
err := sdk.Util.ParseConfig(ctx, cfg, &d.config, NewDestination().Parameters())
if err != nil {
return err
}
err = config.Validate()
err = d.config.Validate()
if err != nil {
return err
}
Expand All @@ -56,11 +54,10 @@ func (d *Destination) Configure(_ context.Context, cfg config.Config) error {
if recordFormat != "" {
recordFormatType, _, _ := strings.Cut(recordFormat, "/")
if recordFormatType == (sdk.DebeziumConverter{}.Name()) {
config = config.WithKafkaConnectKeyFormat()
d.config = d.config.WithKafkaConnectKeyFormat()
}
}

d.config = config
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion destination/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

var (
topicRegex = regexp.MustCompile(`^[a-zA-Z0-9\._\-]+$`)
topicRegex = regexp.MustCompile(`^[a-zA-Z0-9._\-]+$`)
maxTopicLength = 249
)

Expand Down
8 changes: 2 additions & 6 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,14 @@ func (s *Source) Parameters() config.Parameters {
}

func (s *Source) Configure(ctx context.Context, cfg config.Config) error {
var config source.Config

err := sdk.Util.ParseConfig(ctx, cfg, &config)
err := sdk.Util.ParseConfig(ctx, cfg, &s.config, NewSource().Parameters())
if err != nil {
return err
}
err = config.Validate(ctx)
err = s.config.Validate(ctx)
if err != nil {
return err
}

s.config = config
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-kafka/source"
"github.com/conduitio/conduit-connector-kafka/test"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/matryer/is"
Expand Down Expand Up @@ -76,7 +75,7 @@ func TestSource_Read(t *testing.T) {
"kafka.header.header-b": string([]byte{0, 1, 2}),
},
Key: opencdc.RawData(rec.Key),
Payload: sdk.Change{
Payload: opencdc.Change{
After: opencdc.RawData(rec.Value),
},
}
Expand Down
5 changes: 3 additions & 2 deletions test/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"time"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-kafka/common"
sdk "github.com/conduitio/conduit-connector-sdk"
Expand Down Expand Up @@ -90,12 +91,12 @@ func DestinationConfigMap(t T) map[string]string {
return m
}

func ParseConfigMap[C any](t T, cfg map[string]string) C {
func ParseConfigMap[C any](t T, cfg config.Config) C {
is := is.New(t)
is.Helper()

var out C
err := sdk.Util.ParseConfig(ctx, cfg, &out)
err := cfg.DecodeInto(&out)
is.NoErr(err)

return out
Expand Down

0 comments on commit 3afaf5f

Please sign in to comment.