From 1d74a9f938b9963b4075457ad1f3f144dba0cd37 Mon Sep 17 00:00:00 2001 From: Samir Ketema Date: Thu, 18 Jan 2024 17:09:01 -0800 Subject: [PATCH 1/4] add TLS without authentication --- README.md | 1 + common/config.go | 4 +++- common/config_test.go | 2 ++ common/tls.go | 4 +++- destination/paramgen.go | 6 ++++++ source/paramgen.go | 6 ++++++ 6 files changed, 21 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 6ff2580..8d6d459 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ A source is getting associated with a consumer group ID the first time the `Read | `clientID` | A Kafka client ID. | false | `conduit-connector-kafka` | | `readFromBeginning` | Determines from whence the consumer group should begin consuming when it finds a partition without a committed offset. If this option is set to true it will start with the first message in that partition. | false | `false` | | `groupID` | Defines the consumer group ID. | false | | +| `tls` | Defines whether TLS is enabled. | false | `false` | | `clientCert` | A certificate for the Kafka client, in PEM format. If provided, the private key needs to be provided too. | false | | | `clientKey` | A private key for the Kafka client, in PEM format. If provided, the certificate needs to be provided too. | false | | | `caCert` | The Kafka broker's certificate, in PEM format. | false | | diff --git a/common/config.go b/common/config.go index f6f4957..d7d4637 100644 --- a/common/config.go +++ b/common/config.go @@ -19,6 +19,7 @@ import ( "errors" "time" + sdk "github.com/conduitio/conduit-connector-sdk" "github.com/twmb/franz-go/pkg/kgo" ) @@ -62,7 +63,8 @@ func (c Config) Validate() error { // TryDial tries to establish a connection to brokers and returns nil if it // succeeds to connect to at least one broker. func (c Config) TryDial(ctx context.Context) error { - cl, err := kgo.NewClient(kgo.SeedBrokers(c.Servers...)) + opts := c.FranzClientOpts(sdk.Logger(ctx)) + cl, err := kgo.NewClient(opts...) if err != nil { return err } diff --git a/common/config_test.go b/common/config_test.go index 78b8d18..4536740 100644 --- a/common/config_test.go +++ b/common/config_test.go @@ -57,6 +57,7 @@ func TestConfig_Validate(t *testing.T) { name: "invalid Client cert", cfg: Config{ ConfigTLS: ConfigTLS{ + TLSEnabled: true, ClientCert: "foo", }, }, @@ -65,6 +66,7 @@ func TestConfig_Validate(t *testing.T) { name: "invalid Client key", cfg: Config{ ConfigTLS: ConfigTLS{ + TLSEnabled: true, ClientKey: "foo", }, }, diff --git a/common/tls.go b/common/tls.go index 55a7666..035a9b8 100644 --- a/common/tls.go +++ b/common/tls.go @@ -21,6 +21,8 @@ import ( ) type ConfigTLS struct { + // TLSEnabled defines whether TLS is needed to communicate with the Kafka cluster. + TLSEnabled bool `json:"tls"` // ClientCert is the Kafka client's certificate. ClientCert string `json:"clientCert"` // ClientKey is the Kafka client's private key. @@ -46,7 +48,7 @@ func (c ConfigTLS) TLS() *tls.Config { } func (c ConfigTLS) tls() (*tls.Config, error) { - if c.ClientCert == "" && c.CACert == "" && c.ClientKey == "" { + if !c.TLSEnabled { return nil, nil } diff --git a/destination/paramgen.go b/destination/paramgen.go index 7ea3750..35a457f 100644 --- a/destination/paramgen.go +++ b/destination/paramgen.go @@ -95,6 +95,12 @@ func (Config) Parameters() map[string]sdk.Parameter { sdk.ValidationRequired{}, }, }, + "tls": { + Default: "", + Description: "tls defines whether TLS is needed to communicate with the Kafka cluster.", + Type: sdk.ParameterTypeBool, + Validations: []sdk.Validation{}, + }, "topic": { Default: "", Description: "topic is the Kafka topic.", diff --git a/source/paramgen.go b/source/paramgen.go index 127bec9..13972a5 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -79,6 +79,12 @@ func (Config) Parameters() map[string]sdk.Parameter { sdk.ValidationRequired{}, }, }, + "tls": { + Default: "", + Description: "tls defines whether TLS is needed to communicate with the Kafka cluster.", + Type: sdk.ParameterTypeBool, + Validations: []sdk.Validation{}, + }, "topic": { Default: "", Description: "topic is the Kafka topic.", From fe02e617eaca88f2b976b26c70e7c36667db3ae2 Mon Sep 17 00:00:00 2001 From: Samir Ketema Date: Thu, 18 Jan 2024 17:14:13 -0800 Subject: [PATCH 2/4] fmt --- common/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/config_test.go b/common/config_test.go index 4536740..e0501a3 100644 --- a/common/config_test.go +++ b/common/config_test.go @@ -67,7 +67,7 @@ func TestConfig_Validate(t *testing.T) { cfg: Config{ ConfigTLS: ConfigTLS{ TLSEnabled: true, - ClientKey: "foo", + ClientKey: "foo", }, }, wantErr: "tls: failed to find any PEM data in certificate input", From 5c5aa14d3013538a64d67c989912e927827e9fab Mon Sep 17 00:00:00 2001 From: Samir Ketema Date: Fri, 19 Jan 2024 08:01:42 -0800 Subject: [PATCH 3/4] change tls to tls.enabled --- common/tls.go | 2 +- destination/paramgen.go | 4 ++-- source/paramgen.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/common/tls.go b/common/tls.go index 035a9b8..c0f68ff 100644 --- a/common/tls.go +++ b/common/tls.go @@ -22,7 +22,7 @@ import ( type ConfigTLS struct { // TLSEnabled defines whether TLS is needed to communicate with the Kafka cluster. - TLSEnabled bool `json:"tls"` + TLSEnabled bool `json:"tls.enabled"` // ClientCert is the Kafka client's certificate. ClientCert string `json:"clientCert"` // ClientKey is the Kafka client's private key. diff --git a/destination/paramgen.go b/destination/paramgen.go index 35a457f..a279d22 100644 --- a/destination/paramgen.go +++ b/destination/paramgen.go @@ -95,9 +95,9 @@ func (Config) Parameters() map[string]sdk.Parameter { sdk.ValidationRequired{}, }, }, - "tls": { + "tls.enabled": { Default: "", - Description: "tls defines whether TLS is needed to communicate with the Kafka cluster.", + Description: "tls.enabled defines whether TLS is needed to communicate with the Kafka cluster.", Type: sdk.ParameterTypeBool, Validations: []sdk.Validation{}, }, diff --git a/source/paramgen.go b/source/paramgen.go index 13972a5..8be2133 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -79,9 +79,9 @@ func (Config) Parameters() map[string]sdk.Parameter { sdk.ValidationRequired{}, }, }, - "tls": { + "tls.enabled": { Default: "", - Description: "tls defines whether TLS is needed to communicate with the Kafka cluster.", + Description: "tls.enabled defines whether TLS is needed to communicate with the Kafka cluster.", Type: sdk.ParameterTypeBool, Validations: []sdk.Validation{}, }, From 1ee63c1ebd55f23eeb3d5413c5931ce1425b6ff4 Mon Sep 17 00:00:00 2001 From: Samir Ketema Date: Fri, 19 Jan 2024 08:13:08 -0800 Subject: [PATCH 4/4] update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8d6d459..3baf1e6 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ A source is getting associated with a consumer group ID the first time the `Read | `clientID` | A Kafka client ID. | false | `conduit-connector-kafka` | | `readFromBeginning` | Determines from whence the consumer group should begin consuming when it finds a partition without a committed offset. If this option is set to true it will start with the first message in that partition. | false | `false` | | `groupID` | Defines the consumer group ID. | false | | -| `tls` | Defines whether TLS is enabled. | false | `false` | +| `tls.enabled` | Defines whether TLS is enabled. | false | `false` | | `clientCert` | A certificate for the Kafka client, in PEM format. If provided, the private key needs to be provided too. | false | | | `clientKey` | A private key for the Kafka client, in PEM format. If provided, the certificate needs to be provided too. | false | | | `caCert` | The Kafka broker's certificate, in PEM format. | false | |