Skip to content

Commit

Permalink
Update connector SDK (#152)
Browse files Browse the repository at this point in the history
* execute migration script

* manual fixes

* make generate

* don't skip acceptance tests
  • Loading branch information
lovromazgon authored Jun 21, 2024
1 parent 032784e commit 87c01cf
Show file tree
Hide file tree
Showing 19 changed files with 218 additions and 222 deletions.
17 changes: 6 additions & 11 deletions acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-kafka/source"
"github.com/conduitio/conduit-connector-kafka/test"
sdk "github.com/conduitio/conduit-connector-sdk"
Expand Down Expand Up @@ -53,12 +54,6 @@ func TestAcceptance(t *testing.T) {
destCfg["topic"] = randomName
},

Skip: []string{
// Configure tests are faulty since we rely on paramgen to validate required parameters.
"TestSource_Configure_RequiredParams",
"TestDestination_Configure_RequiredParams",
},

WriteTimeout: time.Second * 10,
ReadTimeout: time.Second * 10,
},
Expand All @@ -72,13 +67,13 @@ type AcceptanceTestDriver struct {

// ReadFromDestination is overwritten because the source connector uses a consumer
// group which results in slow reads. This speeds up the destination tests.
func (d AcceptanceTestDriver) ReadFromDestination(t *testing.T, records []sdk.Record) []sdk.Record {
func (d AcceptanceTestDriver) ReadFromDestination(t *testing.T, records []opencdc.Record) []opencdc.Record {
cfg := test.ParseConfigMap[source.Config](t, d.SourceConfig(t))
kgoRecs := test.Consume(t, cfg.Servers, cfg.Topics[0], len(records))

recs := make([]sdk.Record, len(kgoRecs))
recs := make([]opencdc.Record, len(kgoRecs))
for i, rec := range kgoRecs {
metadata := sdk.Metadata{}
metadata := opencdc.Metadata{}
metadata.SetCollection(rec.Topic)
metadata.SetCreatedAt(rec.Timestamp)

Expand All @@ -90,8 +85,8 @@ func (d AcceptanceTestDriver) ReadFromDestination(t *testing.T, records []sdk.Re
Offset: rec.Offset,
}.ToSDKPosition(),
metadata,
sdk.RawData(rec.Key),
sdk.RawData(rec.Value),
opencdc.RawData(rec.Key),
opencdc.RawData(rec.Value),
)
}
return recs
Expand Down
17 changes: 8 additions & 9 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"strings"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-kafka/destination"
sdk "github.com/conduitio/conduit-connector-sdk"
)
Expand All @@ -34,18 +36,16 @@ func NewDestination() sdk.Destination {
return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...)
}

func (d *Destination) Parameters() map[string]sdk.Parameter {
func (d *Destination) Parameters() config.Parameters {
return destination.Config{}.Parameters()
}

func (d *Destination) Configure(_ context.Context, cfg map[string]string) error {
var config destination.Config

err := sdk.Util.ParseConfig(cfg, &config)
func (d *Destination) Configure(ctx context.Context, cfg config.Config) error {
err := sdk.Util.ParseConfig(ctx, cfg, &d.config, NewDestination().Parameters())
if err != nil {
return err
}
err = config.Validate()
err = d.config.Validate()
if err != nil {
return err
}
Expand All @@ -54,11 +54,10 @@ func (d *Destination) Configure(_ context.Context, cfg map[string]string) error
if recordFormat != "" {
recordFormatType, _, _ := strings.Cut(recordFormat, "/")
if recordFormatType == (sdk.DebeziumConverter{}.Name()) {
config = config.WithKafkaConnectKeyFormat()
d.config = d.config.WithKafkaConnectKeyFormat()
}
}

d.config = config
return nil
}

Expand All @@ -76,7 +75,7 @@ func (d *Destination) Open(ctx context.Context) error {
return nil
}

func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error) {
func (d *Destination) Write(ctx context.Context, records []opencdc.Record) (int, error) {
return d.producer.Produce(ctx, records)
}

Expand Down
8 changes: 4 additions & 4 deletions destination/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
"time"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-kafka/common"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/twmb/franz-go/pkg/kgo"
)

var (
topicRegex = regexp.MustCompile(`^[a-zA-Z0-9\._\-]+$`)
topicRegex = regexp.MustCompile(`^[a-zA-Z0-9._\-]+$`)
maxTopicLength = 249
)

Expand Down Expand Up @@ -61,7 +61,7 @@ type Config struct {
useKafkaConnectKeyFormat bool
}

type TopicFn func(sdk.Record) (string, error)
type TopicFn func(opencdc.Record) (string, error)

func (c Config) WithKafkaConnectKeyFormat() Config {
c.useKafkaConnectKeyFormat = true
Expand Down Expand Up @@ -144,7 +144,7 @@ func (c Config) ParseTopic() (topic string, f TopicFn, err error) {

// The topic is a valid template, return TopicFn.
var buf bytes.Buffer
return "", func(r sdk.Record) (string, error) {
return "", func(r opencdc.Record) (string, error) {
buf.Reset()
if err := t.Execute(&buf, r); err != nil {
return "", fmt.Errorf("failed to execute topic template: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions destination/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"strings"
"testing"

sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/matryer/is"
)

Expand Down Expand Up @@ -97,8 +97,8 @@ func TestConfig_ParseTopic_DoesErrorOnTopicNotFound(t *testing.T) {

is.Equal(topic, "")

rec := sdk.Record{
Key: sdk.RawData("testkey"),
rec := opencdc.Record{
Key: opencdc.RawData("testkey"),
Metadata: map[string]string{
"topic": "testtopic",
},
Expand Down
21 changes: 11 additions & 10 deletions destination/franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/conduitio/conduit-commons/csync"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit-connector-sdk/kafkaconnect"
"github.com/goccy/go-json"
Expand All @@ -32,7 +33,7 @@ type FranzProducer struct {
// getTopic is a function that returns the topic for a record. If nil, the
// producer will use the default topic. This function is not safe for
// concurrent use.
getTopic func(sdk.Record) (string, error)
getTopic func(opencdc.Record) (string, error)
}

var _ Producer = (*FranzProducer)(nil)
Expand Down Expand Up @@ -76,7 +77,7 @@ func NewFranzProducer(ctx context.Context, cfg Config) (*FranzProducer, error) {
}, nil
}

func (p *FranzProducer) Produce(ctx context.Context, records []sdk.Record) (int, error) {
func (p *FranzProducer) Produce(ctx context.Context, records []opencdc.Record) (int, error) {
if len(records) == 1 {
// Fast path for a single record.
rec, err := p.prepareRecord(records[0])
Expand Down Expand Up @@ -137,7 +138,7 @@ func (p *FranzProducer) Produce(ctx context.Context, records []sdk.Record) (int,
return len(results), nil
}

func (p *FranzProducer) prepareRecord(r sdk.Record) (*kgo.Record, error) {
func (p *FranzProducer) prepareRecord(r opencdc.Record) (*kgo.Record, error) {
encodedKey, err := p.keyEncoder.Encode(r.Key)
if err != nil {
return nil, fmt.Errorf("could not encode key: %w", err)
Expand Down Expand Up @@ -168,21 +169,21 @@ func (p *FranzProducer) Close(_ context.Context) error {
// a certain format. The producer uses this to encode the key of the kafka
// message.
type dataEncoder interface {
Encode(sdk.Data) ([]byte, error)
Encode(opencdc.Data) ([]byte, error)
}

// bytesEncoder is a dataEncoder that simply calls data.Bytes().
type bytesEncoder struct{}

func (bytesEncoder) Encode(data sdk.Data) ([]byte, error) {
func (bytesEncoder) Encode(data opencdc.Data) ([]byte, error) {
return data.Bytes(), nil
}

// kafkaConnectEncoder encodes the data into a kafka connect JSON with schema
// (NB: this is not the same as JSONSchema).
type kafkaConnectEncoder struct{}

func (e kafkaConnectEncoder) Encode(data sdk.Data) ([]byte, error) {
func (e kafkaConnectEncoder) Encode(data opencdc.Data) ([]byte, error) {
sd := e.toStructuredData(data)
schema := kafkaconnect.Reflect(sd)
if schema == nil {
Expand All @@ -202,15 +203,15 @@ func (e kafkaConnectEncoder) Encode(data sdk.Data) ([]byte, error) {
}

// toStructuredData tries its best to return StructuredData.
func (kafkaConnectEncoder) toStructuredData(d sdk.Data) sdk.Data {
func (kafkaConnectEncoder) toStructuredData(d opencdc.Data) opencdc.Data {
switch d := d.(type) {
case nil:
return nil
case sdk.StructuredData:
case opencdc.StructuredData:
return d
case sdk.RawData:
case opencdc.RawData:
// try parsing the raw data as json
var sd sdk.StructuredData
var sd opencdc.StructuredData
err := json.Unmarshal(d, &sd)
if err != nil {
// it's not JSON, nothing more we can do
Expand Down
4 changes: 2 additions & 2 deletions destination/franz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"testing"
"time"

"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-kafka/common"
"github.com/conduitio/conduit-connector-kafka/test"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/matryer/is"
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestFranzProducer_Opts_Topic(t *testing.T) {
is.Equal(p.client.OptValue(kgo.DefaultProduceTopic), "")
is.True(p.getTopic != nil)

topic, err := p.getTopic(sdk.Record{Metadata: map[string]string{"foo": "bar"}})
topic, err := p.getTopic(opencdc.Record{Metadata: map[string]string{"foo": "bar"}})
is.NoErr(err)
is.Equal(topic, "bar")
})
Expand Down
Loading

0 comments on commit 87c01cf

Please sign in to comment.