Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination - route records to topic based on metadata #130

Merged
merged 6 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,19 @@ jobs:
- uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'

# This step sets up the variable steps.golangci-lint-version.outputs.v
# to contain the version of golangci-lint (e.g. v1.54.2).
# The version is taken from go.mod.
- name: Golangci-lint version
id: golangci-lint-version
run: |
GOLANGCI_LINT_VERSION=$( go list -m -f '{{.Version}}' github.com/golangci/golangci-lint )
echo "v=$GOLANGCI_LINT_VERSION" >> "$GITHUB_OUTPUT"

- name: golangci-lint
uses: golangci/golangci-lint-action@v4.0.0
uses: golangci/golangci-lint-action@v4
with:
version: v1.55.2
version: ${{ steps.golangci-lint-version.outputs.v }}
skip-pkg-cache: true
args: --timeout=2m
23 changes: 23 additions & 0 deletions .github/workflows/validate-generated-files.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: validate-generated-files

on:
push:
branches: [ main ]
pull_request:

jobs:
validate-generated-files:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'

- name: Check generated files
run: |
export PATH=$PATH:$(go env GOPATH)/bin
make install-tools generate
git diff --exit-code --numstat
14 changes: 12 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
.PHONY: build test

VERSION=$(shell git describe --tags --dirty --always)

.PHONY: build
build:
go build -ldflags "-X 'github.com/conduitio/conduit-connector-kafka.version=${VERSION}'" -o conduit-connector-kafka cmd/connector/main.go

.PHONY: test-kafka
test-kafka:
# run required docker containers, execute integration tests, stop containers after tests
docker compose -f test/docker-compose-kafka.yml up --quiet-pull -d --wait
go test $(GOTEST_FLAGS) -race ./...; ret=$$?; \
docker compose -f test/docker-compose-kafka.yml down; \
exit $$ret

.PHONY: test-redpanda
test-redpanda:
# run required docker containers, execute integration tests, stop containers after tests
docker compose -f test/docker-compose-redpanda.yml up --quiet-pull -d --wait
go test $(GOTEST_FLAGS) -race ./...; ret=$$?; \
docker compose -f test/docker-compose-redpanda.yml down; \
exit $$ret

.PHONY: test
test: test-kafka test-redpanda

.PHONY: generate
generate:
go generate ./...

.PHONY: lint
lint:
golangci-lint run

.PHONY: install-tools
install-tools:
@echo Installing tools from tools.go
@go list -e -f '{{ join .Imports "\n" }}' tools.go | xargs -I % go list -f "%@{{.Module.Version}}" % | xargs -tI % go install %
@go mod tidy
32 changes: 16 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,22 @@ The destination connector sends records to Kafka.

There's no global, connector configuration. Each connector instance is configured separately.

| name | description | required | default value |
|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------------------|
| `servers` | Servers is a list of Kafka bootstrap servers, which will be used to discover all the servers in a cluster. | true | |
| `topic` | Topic is the Kafka topic into which records will be written. | true | |
| `clientID` | A Kafka client ID. | false | `conduit-connector-kafka` |
| `acks` | Acks defines the number of acknowledges from partition replicas required before receiving a response to a produce request. `none` = fire and forget, `one` = wait for the leader to acknowledge the writes, `all` = wait for the full ISR to acknowledge the writes. | false | `all` |
| `deliveryTimeout` | Message delivery timeout. | false | |
| `batchBytes` | Limits the maximum size of a request in bytes before being sent to a partition. This mirrors Kafka's `max.message.bytes`. | false | 1000012 |
| `compression` | Compression applied to messages. Possible values: `none`, `gzip`, `snappy`, `lz4`, `zstd`. | false | `snappy` |
| `clientCert` | A certificate for the Kafka client, in PEM format. If provided, the private key needs to be provided too. | false | |
| `clientKey` | A private key for the Kafka client, in PEM format. If provided, the certificate needs to be provided too. | false | |
| `caCert` | The Kafka broker's certificate, in PEM format. | false | |
| `insecureSkipVerify` | Controls whether a client verifies the server's certificate chain and host name. If `true`, accepts any certificate presented by the server and any host name in that certificate. | false | `false` |
| `saslMechanism` | SASL mechanism to be used. Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. If empty, authentication won't be performed. | false | |
| `saslUsername` | SASL username. If provided, a password needs to be provided too. | false | |
| `saslPassword` | SASL password. If provided, a username needs to be provided too. | false | |
| name | description | required | default value |
|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|----------------------------------------------|
| `servers` | Servers is a list of Kafka bootstrap servers, which will be used to discover all the servers in a cluster. | true | |
| `topic` | Topic is the Kafka topic. It can contain a [Go template](https://pkg.go.dev/text/template) that will be executed for each record to determine the topic. By default, the topic is the value of the `opencdc.collection` metadata field. | false | `{{ index .Metadata "opencdc.collection" }}` |
| `clientID` | A Kafka client ID. | false | `conduit-connector-kafka` |
| `acks` | Acks defines the number of acknowledges from partition replicas required before receiving a response to a produce request. `none` = fire and forget, `one` = wait for the leader to acknowledge the writes, `all` = wait for the full ISR to acknowledge the writes. | false | `all` |
| `deliveryTimeout` | Message delivery timeout. | false | |
| `batchBytes` | Limits the maximum size of a request in bytes before being sent to a partition. This mirrors Kafka's `max.message.bytes`. | false | 1000012 |
| `compression` | Compression applied to messages. Possible values: `none`, `gzip`, `snappy`, `lz4`, `zstd`. | false | `snappy` |
| `clientCert` | A certificate for the Kafka client, in PEM format. If provided, the private key needs to be provided too. | false | |
| `clientKey` | A private key for the Kafka client, in PEM format. If provided, the certificate needs to be provided too. | false | |
| `caCert` | The Kafka broker's certificate, in PEM format. | false | |
| `insecureSkipVerify` | Controls whether a client verifies the server's certificate chain and host name. If `true`, accepts any certificate presented by the server and any host name in that certificate. | false | `false` |
| `saslMechanism` | SASL mechanism to be used. Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. If empty, authentication won't be performed. | false | |
| `saslUsername` | SASL username. If provided, a password needs to be provided too. | false | |
| `saslPassword` | SASL password. If provided, a username needs to be provided too. | false | |

### Output format

Expand Down
5 changes: 2 additions & 3 deletions acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"testing"
"time"

"github.com/conduitio/conduit-connector-kafka/common"
"github.com/conduitio/conduit-connector-kafka/source"
"github.com/conduitio/conduit-connector-kafka/test"
sdk "github.com/conduitio/conduit-connector-sdk"
Expand Down Expand Up @@ -69,8 +68,8 @@ type AcceptanceTestDriver struct {
// ReadFromDestination is overwritten because the source connector uses a consumer
// group which results in slow reads. This speeds up the destination tests.
func (d AcceptanceTestDriver) ReadFromDestination(t *testing.T, records []sdk.Record) []sdk.Record {
cfg := test.ParseConfigMap[common.Config](t, d.SourceConfig(t))
kgoRecs := test.Consume(t, cfg, len(records))
cfg := test.ParseConfigMap[source.Config](t, d.SourceConfig(t))
kgoRecs := test.Consume(t, cfg.Servers, cfg.Topic, len(records))

recs := make([]sdk.Record, len(kgoRecs))
for i, rec := range kgoRecs {
Expand Down
2 changes: 0 additions & 2 deletions common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type Config struct {
// Servers is a list of Kafka bootstrap servers, which will be used to
// discover all the servers in a cluster.
Servers []string `json:"servers" validate:"required"`
// Topic is the Kafka topic.
Topic string `json:"topic" validate:"required"`

// ClientID is a unique identifier for client connections established by
// this connector.
Expand Down
69 changes: 68 additions & 1 deletion destination/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,32 @@
package destination

import (
"bytes"
"errors"
"fmt"
"regexp"
"strings"
"text/template"
"time"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit-connector-kafka/common"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/twmb/franz-go/pkg/kgo"
)

var (
topicRegex = regexp.MustCompile(`^[a-zA-Z0-9\._\-]+$`)
maxTopicLength = 249
)

type Config struct {
common.Config

// Topic is the Kafka topic. It can contain a [Go template](https://pkg.go.dev/text/template)
// that will be executed for each record to determine the topic. By default,
// the topic is the value of the `opencdc.collection` metadata field.
Topic string `json:"topic" default:"{{ index .Metadata \"opencdc.collection\" }}"`
// Acks defines the number of acknowledges from partition replicas required
// before receiving a response to a produce request.
// None = fire and forget, one = wait for the leader to acknowledge the
Expand All @@ -44,6 +61,8 @@ type Config struct {
useKafkaConnectKeyFormat bool
}

type TopicFn func(sdk.Record) (string, error)

func (c Config) WithKafkaConnectKeyFormat() Config {
c.useKafkaConnectKeyFormat = true
return c
Expand Down Expand Up @@ -83,5 +102,53 @@ func (c Config) CompressionCodecs() []kgo.CompressionCodec {

// Validate executes manual validations beyond what is defined in struct tags.
func (c Config) Validate() error {
return c.Config.Validate()
var multierr []error

err := c.Config.Validate()
if err != nil {
multierr = append(multierr, err)
}

_, _, err = c.ParseTopic()
if err != nil {
multierr = append(multierr, err)
}

return errors.Join(multierr...)
}

// ParseTopic returns either a static topic or a function that determines the
// topic for each record individually. If the topic is neither static nor a
// template, an error is returned.
func (c Config) ParseTopic() (topic string, f TopicFn, err error) {
if topicRegex.MatchString(c.Topic) {
// The topic is static, check length.
if len(c.Topic) > maxTopicLength {
return "", nil, fmt.Errorf("topic is too long, maximum length is %d", maxTopicLength)
}
return c.Topic, nil, nil
}

// The topic must be a template, check if it contains at least one action {{ }},
// to prevent allowing invalid static topic names.
if !strings.Contains(c.Topic, "{{") || !strings.Contains(c.Topic, "}}") {
return "", nil, fmt.Errorf("topic is neither a valid static Kafka topic nor a valid Go template")
}

// Try to parse the topic
t, err := template.New("topic").Funcs(sprig.FuncMap()).Parse(c.Topic)
if err != nil {
// The topic is not a valid Go template.
return "", nil, fmt.Errorf("topic is neither a valid static Kafka topic nor a valid Go template: %w", err)
}

// The topic is a valid template, return TopicFn.
var buf bytes.Buffer
return "", func(r sdk.Record) (string, error) {
buf.Reset()
if err := t.Execute(&buf, r); err != nil {
return "", fmt.Errorf("failed to execute topic template: %w", err)
}
return buf.String(), nil
}, nil
}
87 changes: 87 additions & 0 deletions destination/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package destination

import (
"strings"
"testing"

"github.com/matryer/is"
)

func TestConfig_Validate(t *testing.T) {
testCases := []struct {
name string
config Config
wantErr string
}{{
name: "valid",
config: Config{
Topic: strings.Repeat("a", 249),
},
}, {
name: "topic too long",
config: Config{
Topic: strings.Repeat("a", 250),
},
wantErr: "topic is too long, maximum length is 249",
}, {
name: "invalid topic characters",
config: Config{
Topic: "foo?",
},
wantErr: "topic is neither a valid static Kafka topic nor a valid Go template",
}, {
name: "invalid Go template 1",
config: Config{
Topic: "}} foo {{",
},
wantErr: "topic is neither a valid static Kafka topic nor a valid Go template",
}, {
name: "invalid Go template 2",
config: Config{
Topic: "}}foo",
},
wantErr: "topic is neither a valid static Kafka topic nor a valid Go template",
}, {
name: "invalid Go template 3",
config: Config{
Topic: "{{ foo }}",
},
wantErr: "topic is neither a valid static Kafka topic nor a valid Go template",
}, {
name: "valid Go template",
config: Config{
Topic: "{{ .Metadata.foo }}",
},
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
err := tc.config.Validate()

if tc.wantErr != "" && err == nil {
t.Errorf("expected error, got nil")
}
if tc.wantErr == "" && err != nil {
t.Errorf("unexpected error: %v", err)
}
if err != nil {
is.True(strings.Contains(err.Error(), tc.wantErr))
}
})
}
}
Loading
Loading