From 946c1572d07c679ce25ad0da5568e206471b16e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Thu, 20 Jun 2024 18:10:38 +0200 Subject: [PATCH] execute migration script --- acceptance_test.go | 11 +++--- destination.go | 10 +++-- destination/config.go | 6 +-- destination/config_test.go | 6 +-- destination/franz.go | 21 +++++----- destination/franz_test.go | 4 +- destination/paramgen.go | 76 ++++++++++++++++++------------------ destination/producer.go | 4 +- destination/producer_mock.go | 4 +- destination_test.go | 4 +- go.mod | 16 ++++---- go.sum | 51 ++++++++++++------------ source.go | 22 ++++++----- source/paramgen.go | 68 ++++++++++++++++---------------- source/position.go | 6 +-- source_integration_test.go | 8 ++-- source_test.go | 23 +++++------ test/util.go | 13 +++--- tools.go | 2 +- 19 files changed, 180 insertions(+), 175 deletions(-) diff --git a/acceptance_test.go b/acceptance_test.go index 0155031..c315a8f 100644 --- a/acceptance_test.go +++ b/acceptance_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/source" "github.com/conduitio/conduit-connector-kafka/test" sdk "github.com/conduitio/conduit-connector-sdk" @@ -72,13 +73,13 @@ type AcceptanceTestDriver struct { // ReadFromDestination is overwritten because the source connector uses a consumer // group which results in slow reads. This speeds up the destination tests. -func (d AcceptanceTestDriver) ReadFromDestination(t *testing.T, records []sdk.Record) []sdk.Record { +func (d AcceptanceTestDriver) ReadFromDestination(t *testing.T, records []opencdc.Record) []opencdc.Record { cfg := test.ParseConfigMap[source.Config](t, d.SourceConfig(t)) kgoRecs := test.Consume(t, cfg.Servers, cfg.Topics[0], len(records)) - recs := make([]sdk.Record, len(kgoRecs)) + recs := make([]opencdc.Record, len(kgoRecs)) for i, rec := range kgoRecs { - metadata := sdk.Metadata{} + metadata := opencdc.Metadata{} metadata.SetCollection(rec.Topic) metadata.SetCreatedAt(rec.Timestamp) @@ -90,8 +91,8 @@ func (d AcceptanceTestDriver) ReadFromDestination(t *testing.T, records []sdk.Re Offset: rec.Offset, }.ToSDKPosition(), metadata, - sdk.RawData(rec.Key), - sdk.RawData(rec.Value), + opencdc.RawData(rec.Key), + opencdc.RawData(rec.Value), ) } return recs diff --git a/destination.go b/destination.go index d129dd5..c007094 100644 --- a/destination.go +++ b/destination.go @@ -19,6 +19,8 @@ import ( "fmt" "strings" + "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/destination" sdk "github.com/conduitio/conduit-connector-sdk" ) @@ -34,14 +36,14 @@ func NewDestination() sdk.Destination { return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...) } -func (d *Destination) Parameters() map[string]sdk.Parameter { +func (d *Destination) Parameters() config.Parameters { return destination.Config{}.Parameters() } -func (d *Destination) Configure(_ context.Context, cfg map[string]string) error { +func (d *Destination) Configure(_ context.Context, cfg config.Config) error { var config destination.Config - err := sdk.Util.ParseConfig(cfg, &config) + err := sdk.Util.ParseConfig(ctx, cfg, &config) if err != nil { return err } @@ -76,7 +78,7 @@ func (d *Destination) Open(ctx context.Context) error { return nil } -func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error) { +func (d *Destination) Write(ctx context.Context, records []opencdc.Record) (int, error) { return d.producer.Produce(ctx, records) } diff --git a/destination/config.go b/destination/config.go index 9107fc5..5004d6d 100644 --- a/destination/config.go +++ b/destination/config.go @@ -26,8 +26,8 @@ import ( "time" "github.com/Masterminds/sprig/v3" + "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/common" - sdk "github.com/conduitio/conduit-connector-sdk" "github.com/twmb/franz-go/pkg/kgo" ) @@ -61,7 +61,7 @@ type Config struct { useKafkaConnectKeyFormat bool } -type TopicFn func(sdk.Record) (string, error) +type TopicFn func(opencdc.Record) (string, error) func (c Config) WithKafkaConnectKeyFormat() Config { c.useKafkaConnectKeyFormat = true @@ -144,7 +144,7 @@ func (c Config) ParseTopic() (topic string, f TopicFn, err error) { // The topic is a valid template, return TopicFn. var buf bytes.Buffer - return "", func(r sdk.Record) (string, error) { + return "", func(r opencdc.Record) (string, error) { buf.Reset() if err := t.Execute(&buf, r); err != nil { return "", fmt.Errorf("failed to execute topic template: %w", err) diff --git a/destination/config_test.go b/destination/config_test.go index fde7e49..fe5a2bb 100644 --- a/destination/config_test.go +++ b/destination/config_test.go @@ -18,7 +18,7 @@ import ( "strings" "testing" - sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/conduitio/conduit-commons/opencdc" "github.com/matryer/is" ) @@ -97,8 +97,8 @@ func TestConfig_ParseTopic_DoesErrorOnTopicNotFound(t *testing.T) { is.Equal(topic, "") - rec := sdk.Record{ - Key: sdk.RawData("testkey"), + rec := opencdc.Record{ + Key: opencdc.RawData("testkey"), Metadata: map[string]string{ "topic": "testtopic", }, diff --git a/destination/franz.go b/destination/franz.go index 37c092a..86f8caf 100644 --- a/destination/franz.go +++ b/destination/franz.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/conduitio/conduit-commons/csync" + "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-connector-sdk" "github.com/conduitio/conduit-connector-sdk/kafkaconnect" "github.com/goccy/go-json" @@ -32,7 +33,7 @@ type FranzProducer struct { // getTopic is a function that returns the topic for a record. If nil, the // producer will use the default topic. This function is not safe for // concurrent use. - getTopic func(sdk.Record) (string, error) + getTopic func(opencdc.Record) (string, error) } var _ Producer = (*FranzProducer)(nil) @@ -76,7 +77,7 @@ func NewFranzProducer(ctx context.Context, cfg Config) (*FranzProducer, error) { }, nil } -func (p *FranzProducer) Produce(ctx context.Context, records []sdk.Record) (int, error) { +func (p *FranzProducer) Produce(ctx context.Context, records []opencdc.Record) (int, error) { if len(records) == 1 { // Fast path for a single record. rec, err := p.prepareRecord(records[0]) @@ -137,7 +138,7 @@ func (p *FranzProducer) Produce(ctx context.Context, records []sdk.Record) (int, return len(results), nil } -func (p *FranzProducer) prepareRecord(r sdk.Record) (*kgo.Record, error) { +func (p *FranzProducer) prepareRecord(r opencdc.Record) (*kgo.Record, error) { encodedKey, err := p.keyEncoder.Encode(r.Key) if err != nil { return nil, fmt.Errorf("could not encode key: %w", err) @@ -168,13 +169,13 @@ func (p *FranzProducer) Close(_ context.Context) error { // a certain format. The producer uses this to encode the key of the kafka // message. type dataEncoder interface { - Encode(sdk.Data) ([]byte, error) + Encode(opencdc.Data) ([]byte, error) } // bytesEncoder is a dataEncoder that simply calls data.Bytes(). type bytesEncoder struct{} -func (bytesEncoder) Encode(data sdk.Data) ([]byte, error) { +func (bytesEncoder) Encode(data opencdc.Data) ([]byte, error) { return data.Bytes(), nil } @@ -182,7 +183,7 @@ func (bytesEncoder) Encode(data sdk.Data) ([]byte, error) { // (NB: this is not the same as JSONSchema). type kafkaConnectEncoder struct{} -func (e kafkaConnectEncoder) Encode(data sdk.Data) ([]byte, error) { +func (e kafkaConnectEncoder) Encode(data opencdc.Data) ([]byte, error) { sd := e.toStructuredData(data) schema := kafkaconnect.Reflect(sd) if schema == nil { @@ -202,15 +203,15 @@ func (e kafkaConnectEncoder) Encode(data sdk.Data) ([]byte, error) { } // toStructuredData tries its best to return StructuredData. -func (kafkaConnectEncoder) toStructuredData(d sdk.Data) sdk.Data { +func (kafkaConnectEncoder) toStructuredData(d opencdc.Data) opencdc.Data { switch d := d.(type) { case nil: return nil - case sdk.StructuredData: + case opencdc.StructuredData: return d - case sdk.RawData: + case opencdc.RawData: // try parsing the raw data as json - var sd sdk.StructuredData + var sd opencdc.StructuredData err := json.Unmarshal(d, &sd) if err != nil { // it's not JSON, nothing more we can do diff --git a/destination/franz_test.go b/destination/franz_test.go index afbc321..1cd54ba 100644 --- a/destination/franz_test.go +++ b/destination/franz_test.go @@ -20,9 +20,9 @@ import ( "testing" "time" + "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/common" "github.com/conduitio/conduit-connector-kafka/test" - sdk "github.com/conduitio/conduit-connector-sdk" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/matryer/is" @@ -138,7 +138,7 @@ func TestFranzProducer_Opts_Topic(t *testing.T) { is.Equal(p.client.OptValue(kgo.DefaultProduceTopic), "") is.True(p.getTopic != nil) - topic, err := p.getTopic(sdk.Record{Metadata: map[string]string{"foo": "bar"}}) + topic, err := p.getTopic(opencdc.Record{Metadata: map[string]string{"foo": "bar"}}) is.NoErr(err) is.Equal(topic, "bar") }) diff --git a/destination/paramgen.go b/destination/paramgen.go index 2507102..8f3a3af 100644 --- a/destination/paramgen.go +++ b/destination/paramgen.go @@ -3,109 +3,107 @@ package destination -import ( - sdk "github.com/conduitio/conduit-connector-sdk" -) +import "github.com/conduitio/conduit-commons/config" -func (Config) Parameters() map[string]sdk.Parameter { - return map[string]sdk.Parameter{ +func (Config) Parameters() config.Parameters { + return map[string]config.Parameter{ "acks": { Default: "all", Description: "acks defines the number of acknowledges from partition replicas required before receiving a response to a produce request. None = fire and forget, one = wait for the leader to acknowledge the writes, all = wait for the full ISR to acknowledge the writes.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationInclusion{List: []string{"none", "one", "all"}}, + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationInclusion{List: []string{"none", "one", "all"}}, }, }, "batchBytes": { Default: "1000012", Description: "batchBytes limits the maximum size of a request in bytes before being sent to a partition. This mirrors Kafka's max.message.bytes.", - Type: sdk.ParameterTypeInt, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeInt, + Validations: []config.Validation{}, }, "caCert": { Default: "", Description: "caCert is the Kafka broker's certificate.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "clientCert": { Default: "", Description: "clientCert is the Kafka client's certificate.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "clientID": { Default: "conduit-connector-kafka", Description: "clientID is a unique identifier for client connections established by this connector.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "clientKey": { Default: "", Description: "clientKey is the Kafka client's private key.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "compression": { Default: "snappy", Description: "compression set the compression codec to be used to compress messages.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationInclusion{List: []string{"none", "gzip", "snappy", "lz4", "zstd"}}, + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationInclusion{List: []string{"none", "gzip", "snappy", "lz4", "zstd"}}, }, }, "deliveryTimeout": { Default: "", Description: "deliveryTimeout for write operation performed by the Writer.", - Type: sdk.ParameterTypeDuration, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeDuration, + Validations: []config.Validation{}, }, "insecureSkipVerify": { Default: "", Description: "insecureSkipVerify defines whether to validate the broker's certificate chain and host name. If 'true', accepts any certificate presented by the server and any host name in that certificate.", - Type: sdk.ParameterTypeBool, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeBool, + Validations: []config.Validation{}, }, "saslMechanism": { Default: "", Description: "saslMechanism configures the connector to use SASL authentication. If empty, no authentication will be performed.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationInclusion{List: []string{"PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"}}, + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationInclusion{List: []string{"PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"}}, }, }, "saslPassword": { Default: "", Description: "saslPassword sets up the password used with SASL authentication.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "saslUsername": { Default: "", Description: "saslUsername sets up the username used with SASL authentication.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "servers": { Default: "", Description: "servers is a list of Kafka bootstrap servers, which will be used to discover all the servers in a cluster.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationRequired{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, }, }, "tls.enabled": { Default: "", Description: "tls.enabled defines whether TLS is needed to communicate with the Kafka cluster.", - Type: sdk.ParameterTypeBool, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeBool, + Validations: []config.Validation{}, }, "topic": { Default: "{{ index .Metadata \"opencdc.collection\" }}", Description: "topic is the Kafka topic. It can contain a [Go template](https://pkg.go.dev/text/template) that will be executed for each record to determine the topic. By default, the topic is the value of the `opencdc.collection` metadata field.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, } } diff --git a/destination/producer.go b/destination/producer.go index 8742562..0ad9d20 100644 --- a/destination/producer.go +++ b/destination/producer.go @@ -19,13 +19,13 @@ package destination import ( "context" - sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/conduitio/conduit-commons/opencdc" ) // Producer is a kafka producer. type Producer interface { // Produce sends all records to Kafka synchronously. - Produce(context.Context, []sdk.Record) (int, error) + Produce(context.Context, []opencdc.Record) (int, error) // Close this producer and the associated resources. Close(context.Context) error } diff --git a/destination/producer_mock.go b/destination/producer_mock.go index d794d36..ce4232f 100644 --- a/destination/producer_mock.go +++ b/destination/producer_mock.go @@ -13,7 +13,7 @@ import ( context "context" reflect "reflect" - sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/conduitio/conduit-commons/opencdc" gomock "go.uber.org/mock/gomock" ) @@ -55,7 +55,7 @@ func (mr *MockProducerMockRecorder) Close(arg0 any) *gomock.Call { } // Produce mocks base method. -func (m *MockProducer) Produce(arg0 context.Context, arg1 []sdk.Record) (int, error) { +func (m *MockProducer) Produce(arg0 context.Context, arg1 []opencdc.Record) (int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Produce", arg0, arg1) ret0, _ := ret[0].(int) diff --git a/destination_test.go b/destination_test.go index 5e95040..bafe4a9 100644 --- a/destination_test.go +++ b/destination_test.go @@ -18,9 +18,9 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/destination" "github.com/conduitio/conduit-connector-kafka/test" - sdk "github.com/conduitio/conduit-connector-sdk" "github.com/matryer/is" "go.uber.org/mock/gomock" ) @@ -54,7 +54,7 @@ func TestDestination_Write_Produce(t *testing.T) { ctrl := gomock.NewController(t) ctx := context.Background() - recs := []sdk.Record{{}} + recs := []opencdc.Record{{}} producerMock := destination.NewMockProducer(ctrl) producerMock.EXPECT().Produce(ctx, recs).Return(1, nil) diff --git a/go.mod b/go.mod index f68e4ed..7463699 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ toolchain go1.22.2 require ( github.com/Masterminds/sprig/v3 v3.2.3 github.com/conduitio/conduit-commons v0.2.0 - github.com/conduitio/conduit-connector-sdk v0.9.1 + github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240619124114-6a89e0d7add8 github.com/goccy/go-json v0.10.3 github.com/golangci/golangci-lint v1.59.1 github.com/google/go-cmp v0.6.0 @@ -54,7 +54,7 @@ require ( github.com/charithe/durationcheck v0.0.10 // indirect github.com/chavacava/garif v0.1.0 // indirect github.com/ckaznocha/intrange v0.1.2 // indirect - github.com/conduitio/conduit-connector-protocol v0.6.0 // indirect + github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240619121958-1df466646d01 // indirect github.com/curioswitch/go-reassign v0.2.0 // indirect github.com/daixiang0/gci v0.13.4 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -91,8 +91,8 @@ require ( github.com/gostaticanalysis/comment v1.4.2 // indirect github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect github.com/gostaticanalysis/nilerr v0.1.1 // indirect - github.com/hashicorp/go-hclog v1.5.0 // indirect - github.com/hashicorp/go-plugin v1.6.0 // indirect + github.com/hashicorp/go-hclog v1.6.3 // indirect + github.com/hashicorp/go-plugin v1.6.1 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/yamux v0.1.1 // indirect @@ -202,7 +202,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.24.0 // indirect - golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect + golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 // indirect golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect golang.org/x/mod v0.18.0 // indirect golang.org/x/net v0.26.0 // indirect @@ -211,9 +211,9 @@ require ( golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.22.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect - google.golang.org/grpc v1.63.2 // indirect - google.golang.org/protobuf v1.33.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect + google.golang.org/grpc v1.64.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 2f3b300..d728a16 100644 --- a/go.sum +++ b/go.sum @@ -99,8 +99,8 @@ github.com/breml/bidichk v0.2.7 h1:dAkKQPLl/Qrk7hnP6P+E0xOodrq8Us7+U0o4UBOAlQY= github.com/breml/bidichk v0.2.7/go.mod h1:YodjipAGI9fGcYM7II6wFvGhdMYsC5pHDlGzqvEW3tQ= github.com/breml/errchkjson v0.3.6 h1:VLhVkqSBH96AvXEyclMR37rZslRrY2kcyq+31HCsVrA= github.com/breml/errchkjson v0.3.6/go.mod h1:jhSDoFheAF2RSDOlCfhHO9KqhZgAYLyvHe7bRCX8f/U= -github.com/bufbuild/protocompile v0.9.0 h1:DI8qLG5PEO0Mu1Oj51YFPqtx6I3qYXUAhJVJ/IzAVl0= -github.com/bufbuild/protocompile v0.9.0/go.mod h1:s89m1O8CqSYpyE/YaSGtg1r1YFMF5nLTwh4vlj6O444= +github.com/bufbuild/protocompile v0.14.0 h1:z3DW4IvXE5G/uTOnSQn+qwQQxvhckkTWLS/0No/o7KU= +github.com/bufbuild/protocompile v0.14.0/go.mod h1:N6J1NYzkspJo3ZwyL4Xjvli86XOj1xq4qAasUFxGups= github.com/butuzov/ireturn v0.3.0 h1:hTjMqWw3y5JC3kpnC5vXmFJAWI/m31jaCYQqzkS6PL0= github.com/butuzov/ireturn v0.3.0/go.mod h1:A09nIiwiqzN/IoVo9ogpa0Hzi9fex1kd9PSD6edP5ZA= github.com/butuzov/mirror v1.2.0 h1:9YVK1qIjNspaqWutSv8gsge2e/Xpq1eqEkslEUHy5cs= @@ -127,10 +127,10 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/conduitio/conduit-commons v0.2.0 h1:TMpVGXi0Wski537qLAyQWdGjuGHEhaZxOS5L90pZJSQ= github.com/conduitio/conduit-commons v0.2.0/go.mod h1:i7Q2jm7FBSi2zj1/4MCsFD1hIKAbvamlNtSQfkhUTiY= -github.com/conduitio/conduit-connector-protocol v0.6.0 h1:2gMOCOpa+c97CHIpZv7Niu3V4o5UgRr6fzj9kzfRV7o= -github.com/conduitio/conduit-connector-protocol v0.6.0/go.mod h1:3mo59xYX9etFoR3n82R7J50La1iWK+Vm63H8z2wo4QM= -github.com/conduitio/conduit-connector-sdk v0.9.1 h1:DiMUn7udnjWvyaDsyeTZFHeYTEIdqUU6dqPunEEE3Kw= -github.com/conduitio/conduit-connector-sdk v0.9.1/go.mod h1:cNoofumgDlsaThkxkNYg7zab4AkmRZt1V711aO7guGU= +github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240619121958-1df466646d01 h1:sZA0aZpZlleULAu+KQYL+WAapXdJNzV3XnSJmwAF0Mg= +github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240619121958-1df466646d01/go.mod h1:3R3eUxN/Z3O3jR1TcfFb9zeGWpiDLvpSOlSWUVa8KsI= +github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240619124114-6a89e0d7add8 h1:2v3Wc74t6T1t1WFxOlm67LToEvkXoKKq6fPxgzCrGG4= +github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240619124114-6a89e0d7add8/go.mod h1:hCmuIMKtYqFnLZWNK343dtQEZJIp+wv/0Qck9N+q+oY= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/curioswitch/go-reassign v0.2.0 h1:G9UZyOcpk/d7Gd6mqYgd8XYWFMw/znxwGDUstnC9DIo= @@ -174,8 +174,8 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= @@ -281,8 +281,8 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= -github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg= -github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= +github.com/google/pprof v0.0.0-20240528025155-186aa0362fba h1:ql1qNgCyOB7iAEk8JTNM+zJrgIbnyCKX/wdlyPufP5g= +github.com/google/pprof v0.0.0-20240528025155-186aa0362fba/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -303,10 +303,10 @@ github.com/gostaticanalysis/nilerr v0.1.1/go.mod h1:wZYb6YI5YAxxq0i1+VJbY0s2YONW github.com/gostaticanalysis/testutil v0.3.1-0.20210208050101-bfb5c8eec0e4/go.mod h1:D+FIZ+7OahH3ePw/izIEeH5I06eKs1IKI4Xr64/Am3M= github.com/gostaticanalysis/testutil v0.4.0 h1:nhdCmubdmDF6VEatUNjgUZBJKWRqugoISdUv3PPQgHY= github.com/gostaticanalysis/testutil v0.4.0/go.mod h1:bLIoPefWXrRi/ssLFWX1dx7Repi5x3CuviD3dgAZaBU= -github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c= -github.com/hashicorp/go-hclog v1.5.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= -github.com/hashicorp/go-plugin v1.6.0 h1:wgd4KxHJTVGGqWBq4QPB1i5BZNEx9BR8+OFmHDmTk8A= -github.com/hashicorp/go-plugin v1.6.0/go.mod h1:lBS5MtSSBZk0SHc66KACcjjlU6WzEVP/8pwz68aMkCI= +github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= +github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-plugin v1.6.1 h1:P7MR2UP6gNKGPp+y7EZw2kOiq4IR9WiqLvp0XOsVdwI= +github.com/hashicorp/go-plugin v1.6.1/go.mod h1:XPHFku2tFo3o3QKFgSYo+cghcUhw1NA1hZyMK0PWAw0= github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= @@ -329,8 +329,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jgautheron/goconst v1.7.1 h1:VpdAG7Ca7yvvJk5n8dMwQhfEZJh95kl/Hl9S1OI5Jkk= github.com/jgautheron/goconst v1.7.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4= -github.com/jhump/protoreflect v1.15.6 h1:WMYJbw2Wo+KOWwZFvgY0jMoVHM6i4XIvRs2RcBj5VmI= -github.com/jhump/protoreflect v1.15.6/go.mod h1:jCHoyYQIJnaabEYnbGwyo9hUqfyUMTbJw/tAut5t97E= +github.com/jhump/protoreflect v1.16.0 h1:54fZg+49widqXYQ0b+usAFHbMkBGR4PpXrsHc8+TBDg= +github.com/jhump/protoreflect v1.16.0/go.mod h1:oYPd7nPvcBw/5wlDfm/AVmU9zH9BgqGCI469pGxfj/8= github.com/jingyugao/rowserrcheck v1.1.1 h1:zibz55j/MJtLsjP1OF4bSdgXxwL1b+Vn7Tjzq7gFzUs= github.com/jingyugao/rowserrcheck v1.1.1/go.mod h1:4yvlZSDb3IyDTUZJUmpZfm2Hwok+Dtp+nu2qOq+er9c= github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af h1:KA9BjwUk7KlCh6S9EAGWBt1oExIUv9WyNCiRz5amv48= @@ -670,8 +670,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY= -golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI= +golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 h1:LoYXNGAShUG3m/ehNk4iFctuhGX/+R1ZpfJ4/ia80JM= +golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= golang.org/x/exp/typeparams v0.0.0-20220428152302-39d4317da171/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= golang.org/x/exp/typeparams v0.0.0-20230203172020-98cc5a0785f9/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f h1:phY1HzDcf18Aq9A8KkmRtY9WvOFIxN8wgfvy6Zm1DV8= @@ -969,8 +969,8 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 h1:1GBuWVLM/KMVUv1t1En5Gs+gFZCNd360GGb4sSxtrhU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -983,8 +983,8 @@ google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKa google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= -google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= +google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= +google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -997,13 +997,14 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/source.go b/source.go index a3928b1..1752cf4 100644 --- a/source.go +++ b/source.go @@ -18,6 +18,8 @@ import ( "context" "fmt" + "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/source" sdk "github.com/conduitio/conduit-connector-sdk" "github.com/google/uuid" @@ -38,14 +40,14 @@ func NewSource() sdk.Source { return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...) } -func (s *Source) Parameters() map[string]sdk.Parameter { +func (s *Source) Parameters() config.Parameters { return source.Config{}.Parameters() } -func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { +func (s *Source) Configure(ctx context.Context, cfg config.Config) error { var config source.Config - err := sdk.Util.ParseConfig(cfg, &config) + err := sdk.Util.ParseConfig(ctx, cfg, &config) if err != nil { return err } @@ -58,7 +60,7 @@ func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { return nil } -func (s *Source) Open(ctx context.Context, sdkPos sdk.Position) error { +func (s *Source) Open(ctx context.Context, sdkPos opencdc.Position) error { err := s.config.TryDial(ctx) if err != nil { return fmt.Errorf("failed to dial broker: %w", err) @@ -89,13 +91,13 @@ func (s *Source) Open(ctx context.Context, sdkPos sdk.Position) error { return nil } -func (s *Source) Read(ctx context.Context) (sdk.Record, error) { +func (s *Source) Read(ctx context.Context) (opencdc.Record, error) { rec, err := s.consumer.Consume(ctx) if err != nil { - return sdk.Record{}, fmt.Errorf("failed getting a record: %w", err) + return opencdc.Record{}, fmt.Errorf("failed getting a record: %w", err) } - metadata := sdk.Metadata{} + metadata := opencdc.Metadata{} metadata.SetCollection(rec.Topic) metadata.SetCreatedAt(rec.Timestamp) for _, h := range rec.Headers { @@ -110,12 +112,12 @@ func (s *Source) Read(ctx context.Context) (sdk.Record, error) { Offset: rec.Offset, }.ToSDKPosition(), metadata, - sdk.RawData(rec.Key), - sdk.RawData(rec.Value), + opencdc.RawData(rec.Key), + opencdc.RawData(rec.Value), ), nil } -func (s *Source) Ack(ctx context.Context, _ sdk.Position) error { +func (s *Source) Ack(ctx context.Context, _ opencdc.Position) error { return s.consumer.Ack(ctx) } diff --git a/source/paramgen.go b/source/paramgen.go index b9398f9..b4d5f5e 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -3,99 +3,97 @@ package source -import ( - sdk "github.com/conduitio/conduit-connector-sdk" -) +import "github.com/conduitio/conduit-commons/config" -func (Config) Parameters() map[string]sdk.Parameter { - return map[string]sdk.Parameter{ +func (Config) Parameters() config.Parameters { + return map[string]config.Parameter{ "caCert": { Default: "", Description: "caCert is the Kafka broker's certificate.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "clientCert": { Default: "", Description: "clientCert is the Kafka client's certificate.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "clientID": { Default: "conduit-connector-kafka", Description: "clientID is a unique identifier for client connections established by this connector.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "clientKey": { Default: "", Description: "clientKey is the Kafka client's private key.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "groupID": { Default: "", Description: "groupID defines the consumer group id.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "insecureSkipVerify": { Default: "", Description: "insecureSkipVerify defines whether to validate the broker's certificate chain and host name. If 'true', accepts any certificate presented by the server and any host name in that certificate.", - Type: sdk.ParameterTypeBool, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeBool, + Validations: []config.Validation{}, }, "readFromBeginning": { Default: "", Description: "readFromBeginning determines from whence the consumer group should begin consuming when it finds a partition without a committed offset. If this options is set to true it will start with the first message in that partition.", - Type: sdk.ParameterTypeBool, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeBool, + Validations: []config.Validation{}, }, "saslMechanism": { Default: "", Description: "saslMechanism configures the connector to use SASL authentication. If empty, no authentication will be performed.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationInclusion{List: []string{"PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"}}, + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationInclusion{List: []string{"PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"}}, }, }, "saslPassword": { Default: "", Description: "saslPassword sets up the password used with SASL authentication.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "saslUsername": { Default: "", Description: "saslUsername sets up the username used with SASL authentication.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "servers": { Default: "", Description: "servers is a list of Kafka bootstrap servers, which will be used to discover all the servers in a cluster.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationRequired{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, }, }, "tls.enabled": { Default: "", Description: "tls.enabled defines whether TLS is needed to communicate with the Kafka cluster.", - Type: sdk.ParameterTypeBool, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeBool, + Validations: []config.Validation{}, }, "topic": { Default: "", Description: "topic {WARN will be deprecated soon} the kafka topic to read from.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "topics": { Default: "", Description: "topics is a comma separated list of Kafka topics to read from.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, } } diff --git a/source/position.go b/source/position.go index 8c6ce37..2692f75 100644 --- a/source/position.go +++ b/source/position.go @@ -17,7 +17,7 @@ package source import ( "fmt" - sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/conduitio/conduit-commons/opencdc" "github.com/goccy/go-json" ) @@ -28,7 +28,7 @@ type Position struct { Offset int64 } -func ParseSDKPosition(sdkPos sdk.Position) (Position, error) { +func ParseSDKPosition(sdkPos opencdc.Position) (Position, error) { var p Position err := json.Unmarshal(sdkPos, &p) if err != nil { @@ -37,7 +37,7 @@ func ParseSDKPosition(sdkPos sdk.Position) (Position, error) { return p, nil } -func (p Position) ToSDKPosition() sdk.Position { +func (p Position) ToSDKPosition() opencdc.Position { b, err := json.Marshal(p) if err != nil { // this error should not be possible diff --git a/source_integration_test.go b/source_integration_test.go index fb5bd86..b4c0a93 100644 --- a/source_integration_test.go +++ b/source_integration_test.go @@ -18,9 +18,9 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/source" "github.com/conduitio/conduit-connector-kafka/test" - sdk "github.com/conduitio/conduit-connector-sdk" "github.com/matryer/is" "github.com/twmb/franz-go/pkg/kgo" ) @@ -68,10 +68,10 @@ func TestSource_Integration_RestartPartial(t *testing.T) { func testSourceIntegrationRead( t *testing.T, cfgMap map[string]string, - startFrom sdk.Position, + startFrom opencdc.Position, wantRecords []*kgo.Record, ackFirstOnly bool, -) sdk.Position { +) opencdc.Position { is := is.New(t) ctx := context.Background() @@ -86,7 +86,7 @@ func testSourceIntegrationRead( err = underTest.Open(ctx, startFrom) is.NoErr(err) - var positions []sdk.Position + var positions []opencdc.Position for _, wantRecord := range wantRecords { rec, err := underTest.Read(ctx) is.NoErr(err) diff --git a/source_test.go b/source_test.go index 27a5795..3c7fc6b 100644 --- a/source_test.go +++ b/source_test.go @@ -19,6 +19,7 @@ import ( "strconv" "testing" + "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/source" "github.com/conduitio/conduit-connector-kafka/test" sdk "github.com/conduitio/conduit-connector-sdk" @@ -60,23 +61,23 @@ func TestSource_Read(t *testing.T) { {Key: "header-a", Value: []byte("value-a")}, {Key: "header-b", Value: []byte{0, 1, 2}}, } - want := sdk.Record{ + want := opencdc.Record{ Position: source.Position{ GroupID: "", Topic: rec.Topic, Partition: rec.Partition, Offset: rec.Offset, }.ToSDKPosition(), - Operation: sdk.OperationCreate, + Operation: opencdc.OperationCreate, Metadata: map[string]string{ - sdk.MetadataCollection: rec.Topic, - sdk.MetadataCreatedAt: strconv.FormatInt(rec.Timestamp.UnixNano(), 10), - "kafka.header.header-a": "value-a", - "kafka.header.header-b": string([]byte{0, 1, 2}), + opencdc.MetadataCollection: rec.Topic, + opencdc.MetadataCreatedAt: strconv.FormatInt(rec.Timestamp.UnixNano(), 10), + "kafka.header.header-a": "value-a", + "kafka.header.header-b": string([]byte{0, 1, 2}), }, - Key: sdk.RawData(rec.Key), + Key: opencdc.RawData(rec.Key), Payload: sdk.Change{ - After: sdk.RawData(rec.Value), + After: opencdc.RawData(rec.Value), }, } @@ -90,7 +91,7 @@ func TestSource_Read(t *testing.T) { underTest := Source{consumer: consumerMock, config: cfg} got, err := underTest.Read(context.Background()) is.NoErr(err) - is.True(got.Metadata[sdk.MetadataReadAt] != "") - want.Metadata[sdk.MetadataReadAt] = got.Metadata[sdk.MetadataReadAt] - is.Equal(cmp.Diff(want, got, cmpopts.IgnoreUnexported(sdk.Record{})), "") + is.True(got.Metadata[opencdc.MetadataReadAt] != "") + want.Metadata[opencdc.MetadataReadAt] = got.Metadata[opencdc.MetadataReadAt] + is.Equal(cmp.Diff(want, got, cmpopts.IgnoreUnexported(opencdc.Record{})), "") } diff --git a/test/util.go b/test/util.go index 7d18d4e..15f895a 100644 --- a/test/util.go +++ b/test/util.go @@ -24,6 +24,7 @@ import ( "strings" "time" + "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/common" sdk "github.com/conduitio/conduit-connector-sdk" "github.com/google/uuid" @@ -94,7 +95,7 @@ func ParseConfigMap[C any](t T, cfg map[string]string) C { is.Helper() var out C - err := sdk.Util.ParseConfig(cfg, &out) + err := sdk.Util.ParseConfig(ctx, cfg, &out) is.NoErr(err) return out @@ -164,19 +165,19 @@ func GenerateFranzRecords(from, to int, topicOpt ...string) []*kgo.Record { return recs } -func GenerateSDKRecords(from, to int, topicOpt ...string) []sdk.Record { +func GenerateSDKRecords(from, to int, topicOpt ...string) []opencdc.Record { recs := GenerateFranzRecords(from, to, topicOpt...) - sdkRecs := make([]sdk.Record, len(recs)) + sdkRecs := make([]opencdc.Record, len(recs)) for i, rec := range recs { - metadata := sdk.Metadata{} + metadata := opencdc.Metadata{} metadata.SetCollection(rec.Topic) metadata.SetCreatedAt(rec.Timestamp) sdkRecs[i] = sdk.Util.Source.NewRecordCreate( []byte(uuid.NewString()), metadata, - sdk.RawData(rec.Key), - sdk.RawData(rec.Value), + opencdc.RawData(rec.Key), + opencdc.RawData(rec.Value), ) } return sdkRecs diff --git a/tools.go b/tools.go index b245373..d2ee24b 100644 --- a/tools.go +++ b/tools.go @@ -17,7 +17,7 @@ package main import ( - _ "github.com/conduitio/conduit-connector-sdk/cmd/paramgen" + _ "github.com/conduitio/conduit-commons/paramgen" _ "github.com/golangci/golangci-lint/cmd/golangci-lint" _ "go.uber.org/mock/mockgen" )