Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support TLS without authentication #118

Merged
merged 4 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ A source is getting associated with a consumer group ID the first time the `Read
| `clientID` | A Kafka client ID. | false | `conduit-connector-kafka` |
| `readFromBeginning` | Determines from whence the consumer group should begin consuming when it finds a partition without a committed offset. If this option is set to true it will start with the first message in that partition. | false | `false` |
| `groupID` | Defines the consumer group ID. | false | |
| `tls` | Defines whether TLS is enabled. | false | `false` |
| `clientCert` | A certificate for the Kafka client, in PEM format. If provided, the private key needs to be provided too. | false | |
| `clientKey` | A private key for the Kafka client, in PEM format. If provided, the certificate needs to be provided too. | false | |
| `caCert` | The Kafka broker's certificate, in PEM format. | false | |
Expand Down
4 changes: 3 additions & 1 deletion common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"time"

sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/twmb/franz-go/pkg/kgo"
)

Expand Down Expand Up @@ -62,7 +63,8 @@ func (c Config) Validate() error {
// TryDial tries to establish a connection to brokers and returns nil if it
// succeeds to connect to at least one broker.
func (c Config) TryDial(ctx context.Context) error {
cl, err := kgo.NewClient(kgo.SeedBrokers(c.Servers...))
opts := c.FranzClientOpts(sdk.Logger(ctx))
cl, err := kgo.NewClient(opts...)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion common/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestConfig_Validate(t *testing.T) {
name: "invalid Client cert",
cfg: Config{
ConfigTLS: ConfigTLS{
TLSEnabled: true,
ClientCert: "foo",
},
},
Expand All @@ -65,7 +66,8 @@ func TestConfig_Validate(t *testing.T) {
name: "invalid Client key",
cfg: Config{
ConfigTLS: ConfigTLS{
ClientKey: "foo",
TLSEnabled: true,
ClientKey: "foo",
},
},
wantErr: "tls: failed to find any PEM data in certificate input",
Expand Down
4 changes: 3 additions & 1 deletion common/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
)

type ConfigTLS struct {
// TLSEnabled defines whether TLS is needed to communicate with the Kafka cluster.
TLSEnabled bool `json:"tls"`
samirketema marked this conversation as resolved.
Show resolved Hide resolved
// ClientCert is the Kafka client's certificate.
ClientCert string `json:"clientCert"`
// ClientKey is the Kafka client's private key.
Expand All @@ -46,7 +48,7 @@ func (c ConfigTLS) TLS() *tls.Config {
}

func (c ConfigTLS) tls() (*tls.Config, error) {
if c.ClientCert == "" && c.CACert == "" && c.ClientKey == "" {
if !c.TLSEnabled {
return nil, nil
}

Expand Down
6 changes: 6 additions & 0 deletions destination/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions source/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading