Skip to content

Commit

Permalink
validate kafka topic
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Apr 10, 2024
1 parent d992e09 commit 105e5ea
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 26 deletions.
65 changes: 64 additions & 1 deletion destination/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,25 @@
package destination

import (
"bytes"
"errors"
"fmt"
"regexp"
"strings"
"text/template"
"time"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit-connector-kafka/common"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/twmb/franz-go/pkg/kgo"
)

var (
topicRegex = regexp.MustCompile(`^[a-zA-Z0-9\._\-]+$`)
maxTopicLength = 249
)

type Config struct {
common.Config

Expand All @@ -48,6 +61,8 @@ type Config struct {
useKafkaConnectKeyFormat bool
}

type TopicFn func(sdk.Record) (string, error)

func (c Config) WithKafkaConnectKeyFormat() Config {
c.useKafkaConnectKeyFormat = true
return c
Expand Down Expand Up @@ -87,5 +102,53 @@ func (c Config) CompressionCodecs() []kgo.CompressionCodec {

// Validate executes manual validations beyond what is defined in struct tags.
func (c Config) Validate() error {
return c.Config.Validate()
var multierr []error

err := c.Config.Validate()
if err != nil {
multierr = append(multierr, err)
}

_, _, err = c.ParseTopic()
if err != nil {
multierr = append(multierr, err)
}

return errors.Join(multierr...)
}

// ParseTopic returns either a static topic or a function that determines the
// topic for each record individually. If the topic is neither static nor a
// template, an error is returned.
func (c Config) ParseTopic() (topic string, f TopicFn, err error) {
if topicRegex.MatchString(c.Topic) {
// The topic is static, check length.
if len(c.Topic) > maxTopicLength {
return "", nil, fmt.Errorf("topic is too long, maximum length is %d", maxTopicLength)
}
return c.Topic, nil, nil
}

// The topic must be a template, check if it contains at least one action {{ }},
// to prevent allowing invalid static topic names.
if !strings.Contains(c.Topic, "{{") || !strings.Contains(c.Topic, "}}") {
return "", nil, fmt.Errorf("topic is neither a valid static Kafka topic nor a valid Go template")
}

// Try to parse the topic
t, err := template.New("topic").Funcs(sprig.FuncMap()).Parse(c.Topic)
if err != nil {
// The topic is not a valid Go template.
return "", nil, fmt.Errorf("topic is neither a valid static Kafka topic nor a valid Go template: %w", err)
}

// The topic is a valid template, return TopicFn.
var buf bytes.Buffer
return "", func(r sdk.Record) (string, error) {
buf.Reset()
if err := t.Execute(&buf, r); err != nil {
return "", fmt.Errorf("failed to execute topic template: %w", err)
}
return buf.String(), nil
}, nil
}
87 changes: 87 additions & 0 deletions destination/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package destination

import (
"strings"
"testing"

"github.com/matryer/is"
)

func TestConfig_Validate(t *testing.T) {
testCases := []struct {
name string
config Config
wantErr string
}{{
name: "valid",
config: Config{
Topic: strings.Repeat("a", 249),
},
}, {
name: "topic too long",
config: Config{
Topic: strings.Repeat("a", 250),
},
wantErr: "topic is too long, maximum length is 249",
}, {
name: "invalid topic characters",
config: Config{
Topic: "foo?",
},
wantErr: "topic is neither a valid static Kafka topic nor a valid Go template",
}, {
name: "invalid Go template 1",
config: Config{
Topic: "}} foo {{",
},
wantErr: "topic is neither a valid static Kafka topic nor a valid Go template",
}, {
name: "invalid Go template 2",
config: Config{
Topic: "}}foo",
},
wantErr: "topic is neither a valid static Kafka topic nor a valid Go template",
}, {
name: "invalid Go template 3",
config: Config{
Topic: "{{ foo }}",
},
wantErr: "topic is neither a valid static Kafka topic nor a valid Go template",
}, {
name: "valid Go template",
config: Config{
Topic: "{{ .Metadata.foo }}",
},
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
err := tc.config.Validate()

if tc.wantErr != "" && err == nil {
t.Errorf("expected error, got nil")
}
if tc.wantErr == "" && err != nil {
t.Errorf("unexpected error: %v", err)
}
if err != nil {
is.True(strings.Contains(err.Error(), tc.wantErr))
}
})
}
}
31 changes: 7 additions & 24 deletions destination/franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@
package destination

import (
"bytes"
"context"
"fmt"
"strings"
"text/template"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit-commons/csync"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit-connector-sdk/kafkaconnect"
Expand All @@ -42,35 +38,22 @@ type FranzProducer struct {
var _ Producer = (*FranzProducer)(nil)

func NewFranzProducer(ctx context.Context, cfg Config) (*FranzProducer, error) {
topic, topicFn, err := cfg.ParseTopic()
if err != nil {
// Unlikely to happen, as the topic is validated in the config.
return nil, fmt.Errorf("failed to parse topic: %w", err)
}

opts := cfg.FranzClientOpts(sdk.Logger(ctx))
opts = append(opts, []kgo.Opt{
kgo.AllowAutoTopicCreation(),
kgo.RecordDeliveryTimeout(cfg.DeliveryTimeout),
kgo.RequiredAcks(cfg.RequiredAcks()),
kgo.ProducerBatchCompression(cfg.CompressionCodecs()...),
kgo.ProducerBatchMaxBytes(cfg.BatchBytes),
kgo.DefaultProduceTopic(topic),
}...)

var topicFn func(sdk.Record) (string, error)
if strings.Contains(cfg.Topic, "{{") && strings.Contains(cfg.Topic, "}}") {
// If the topic contains a template, the topic will be determined for
// each record individually, so we can't set the default topic here.
t, err := template.New("topic").Funcs(sprig.FuncMap()).Parse(cfg.Topic)
if err != nil {
return nil, fmt.Errorf("failed to parse topic template: %w", err)
}
var buf bytes.Buffer
topicFn = func(r sdk.Record) (string, error) {
buf.Reset()
if err := t.Execute(&buf, r); err != nil {
return "", fmt.Errorf("failed to execute topic template: %w", err)
}
return buf.String(), nil
}
} else {
opts = append(opts, kgo.DefaultProduceTopic(cfg.Topic))
}

if cfg.RequiredAcks() != kgo.AllISRAcks() {
sdk.Logger(ctx).Warn().Msgf("disabling idempotent writes because \"acks\" is set to %v", cfg.Acks)
opts = append(opts, kgo.DisableIdempotentWrite())
Expand Down
1 change: 1 addition & 0 deletions destination/franz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestFranzProducer_Opts_AcksDisableIdempotentWrite(t *testing.T) {
// minimal valid config
cfg := Config{
Config: common.Config{Servers: []string{"test-host:9092"}},
Topic: "foo",
BatchBytes: 512,
}

Expand Down
2 changes: 1 addition & 1 deletion tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//go:build tools

package postgres
package main

import (
_ "github.com/conduitio/conduit-connector-sdk/cmd/paramgen"
Expand Down

0 comments on commit 105e5ea

Please sign in to comment.