Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update connector SDK #152

Merged
merged 4 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-kafka/source"
"github.com/conduitio/conduit-connector-kafka/test"
sdk "github.com/conduitio/conduit-connector-sdk"
Expand Down Expand Up @@ -53,12 +54,6 @@ func TestAcceptance(t *testing.T) {
destCfg["topic"] = randomName
},

Skip: []string{
// Configure tests are faulty since we rely on paramgen to validate required parameters.
"TestSource_Configure_RequiredParams",
"TestDestination_Configure_RequiredParams",
},

WriteTimeout: time.Second * 10,
ReadTimeout: time.Second * 10,
},
Expand All @@ -72,13 +67,13 @@ type AcceptanceTestDriver struct {

// ReadFromDestination is overwritten because the source connector uses a consumer
// group which results in slow reads. This speeds up the destination tests.
func (d AcceptanceTestDriver) ReadFromDestination(t *testing.T, records []sdk.Record) []sdk.Record {
func (d AcceptanceTestDriver) ReadFromDestination(t *testing.T, records []opencdc.Record) []opencdc.Record {
cfg := test.ParseConfigMap[source.Config](t, d.SourceConfig(t))
kgoRecs := test.Consume(t, cfg.Servers, cfg.Topics[0], len(records))

recs := make([]sdk.Record, len(kgoRecs))
recs := make([]opencdc.Record, len(kgoRecs))
for i, rec := range kgoRecs {
metadata := sdk.Metadata{}
metadata := opencdc.Metadata{}
metadata.SetCollection(rec.Topic)
metadata.SetCreatedAt(rec.Timestamp)

Expand All @@ -90,8 +85,8 @@ func (d AcceptanceTestDriver) ReadFromDestination(t *testing.T, records []sdk.Re
Offset: rec.Offset,
}.ToSDKPosition(),
metadata,
sdk.RawData(rec.Key),
sdk.RawData(rec.Value),
opencdc.RawData(rec.Key),
opencdc.RawData(rec.Value),
)
}
return recs
Expand Down
17 changes: 8 additions & 9 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"strings"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-kafka/destination"
sdk "github.com/conduitio/conduit-connector-sdk"
)
Expand All @@ -34,18 +36,16 @@ func NewDestination() sdk.Destination {
return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...)
}

func (d *Destination) Parameters() map[string]sdk.Parameter {
func (d *Destination) Parameters() config.Parameters {
return destination.Config{}.Parameters()
}

func (d *Destination) Configure(_ context.Context, cfg map[string]string) error {
var config destination.Config

err := sdk.Util.ParseConfig(cfg, &config)
func (d *Destination) Configure(ctx context.Context, cfg config.Config) error {
err := sdk.Util.ParseConfig(ctx, cfg, &d.config, NewDestination().Parameters())
if err != nil {
return err
}
err = config.Validate()
err = d.config.Validate()
if err != nil {
return err
}
Expand All @@ -54,11 +54,10 @@ func (d *Destination) Configure(_ context.Context, cfg map[string]string) error
if recordFormat != "" {
recordFormatType, _, _ := strings.Cut(recordFormat, "/")
if recordFormatType == (sdk.DebeziumConverter{}.Name()) {
config = config.WithKafkaConnectKeyFormat()
d.config = d.config.WithKafkaConnectKeyFormat()
}
}

d.config = config
return nil
}

Expand All @@ -76,7 +75,7 @@ func (d *Destination) Open(ctx context.Context) error {
return nil
}

func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error) {
func (d *Destination) Write(ctx context.Context, records []opencdc.Record) (int, error) {
return d.producer.Produce(ctx, records)
}

Expand Down
8 changes: 4 additions & 4 deletions destination/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
"time"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-kafka/common"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/twmb/franz-go/pkg/kgo"
)

var (
topicRegex = regexp.MustCompile(`^[a-zA-Z0-9\._\-]+$`)
topicRegex = regexp.MustCompile(`^[a-zA-Z0-9._\-]+$`)
maxTopicLength = 249
)

Expand Down Expand Up @@ -61,7 +61,7 @@ type Config struct {
useKafkaConnectKeyFormat bool
}

type TopicFn func(sdk.Record) (string, error)
type TopicFn func(opencdc.Record) (string, error)

func (c Config) WithKafkaConnectKeyFormat() Config {
c.useKafkaConnectKeyFormat = true
Expand Down Expand Up @@ -144,7 +144,7 @@ func (c Config) ParseTopic() (topic string, f TopicFn, err error) {

// The topic is a valid template, return TopicFn.
var buf bytes.Buffer
return "", func(r sdk.Record) (string, error) {
return "", func(r opencdc.Record) (string, error) {
buf.Reset()
if err := t.Execute(&buf, r); err != nil {
return "", fmt.Errorf("failed to execute topic template: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions destination/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"strings"
"testing"

sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/matryer/is"
)

Expand Down Expand Up @@ -97,8 +97,8 @@ func TestConfig_ParseTopic_DoesErrorOnTopicNotFound(t *testing.T) {

is.Equal(topic, "")

rec := sdk.Record{
Key: sdk.RawData("testkey"),
rec := opencdc.Record{
Key: opencdc.RawData("testkey"),
Metadata: map[string]string{
"topic": "testtopic",
},
Expand Down
21 changes: 11 additions & 10 deletions destination/franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/conduitio/conduit-commons/csync"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit-connector-sdk/kafkaconnect"
"github.com/goccy/go-json"
Expand All @@ -32,7 +33,7 @@ type FranzProducer struct {
// getTopic is a function that returns the topic for a record. If nil, the
// producer will use the default topic. This function is not safe for
// concurrent use.
getTopic func(sdk.Record) (string, error)
getTopic func(opencdc.Record) (string, error)
}

var _ Producer = (*FranzProducer)(nil)
Expand Down Expand Up @@ -76,7 +77,7 @@ func NewFranzProducer(ctx context.Context, cfg Config) (*FranzProducer, error) {
}, nil
}

func (p *FranzProducer) Produce(ctx context.Context, records []sdk.Record) (int, error) {
func (p *FranzProducer) Produce(ctx context.Context, records []opencdc.Record) (int, error) {
if len(records) == 1 {
// Fast path for a single record.
rec, err := p.prepareRecord(records[0])
Expand Down Expand Up @@ -137,7 +138,7 @@ func (p *FranzProducer) Produce(ctx context.Context, records []sdk.Record) (int,
return len(results), nil
}

func (p *FranzProducer) prepareRecord(r sdk.Record) (*kgo.Record, error) {
func (p *FranzProducer) prepareRecord(r opencdc.Record) (*kgo.Record, error) {
encodedKey, err := p.keyEncoder.Encode(r.Key)
if err != nil {
return nil, fmt.Errorf("could not encode key: %w", err)
Expand Down Expand Up @@ -168,21 +169,21 @@ func (p *FranzProducer) Close(_ context.Context) error {
// a certain format. The producer uses this to encode the key of the kafka
// message.
type dataEncoder interface {
Encode(sdk.Data) ([]byte, error)
Encode(opencdc.Data) ([]byte, error)
}

// bytesEncoder is a dataEncoder that simply calls data.Bytes().
type bytesEncoder struct{}

func (bytesEncoder) Encode(data sdk.Data) ([]byte, error) {
func (bytesEncoder) Encode(data opencdc.Data) ([]byte, error) {
return data.Bytes(), nil
}

// kafkaConnectEncoder encodes the data into a kafka connect JSON with schema
// (NB: this is not the same as JSONSchema).
type kafkaConnectEncoder struct{}

func (e kafkaConnectEncoder) Encode(data sdk.Data) ([]byte, error) {
func (e kafkaConnectEncoder) Encode(data opencdc.Data) ([]byte, error) {
sd := e.toStructuredData(data)
schema := kafkaconnect.Reflect(sd)
if schema == nil {
Expand All @@ -202,15 +203,15 @@ func (e kafkaConnectEncoder) Encode(data sdk.Data) ([]byte, error) {
}

// toStructuredData tries its best to return StructuredData.
func (kafkaConnectEncoder) toStructuredData(d sdk.Data) sdk.Data {
func (kafkaConnectEncoder) toStructuredData(d opencdc.Data) opencdc.Data {
switch d := d.(type) {
case nil:
return nil
case sdk.StructuredData:
case opencdc.StructuredData:
return d
case sdk.RawData:
case opencdc.RawData:
// try parsing the raw data as json
var sd sdk.StructuredData
var sd opencdc.StructuredData
err := json.Unmarshal(d, &sd)
if err != nil {
// it's not JSON, nothing more we can do
Expand Down
4 changes: 2 additions & 2 deletions destination/franz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"testing"
"time"

"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-kafka/common"
"github.com/conduitio/conduit-connector-kafka/test"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/matryer/is"
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestFranzProducer_Opts_Topic(t *testing.T) {
is.Equal(p.client.OptValue(kgo.DefaultProduceTopic), "")
is.True(p.getTopic != nil)

topic, err := p.getTopic(sdk.Record{Metadata: map[string]string{"foo": "bar"}})
topic, err := p.getTopic(opencdc.Record{Metadata: map[string]string{"foo": "bar"}})
is.NoErr(err)
is.Equal(topic, "bar")
})
Expand Down
Loading
Loading