Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination - route records to topic based on metadata #130

Merged
merged 6 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,19 @@ jobs:
- uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'

# This step sets up the variable steps.golangci-lint-version.outputs.v
# to contain the version of golangci-lint (e.g. v1.54.2).
# The version is taken from go.mod.
- name: Golangci-lint version
id: golangci-lint-version
run: |
GOLANGCI_LINT_VERSION=$( go list -m -f '{{.Version}}' github.com/golangci/golangci-lint )
echo "v=$GOLANGCI_LINT_VERSION" >> "$GITHUB_OUTPUT"

- name: golangci-lint
uses: golangci/golangci-lint-action@v4.0.0
uses: golangci/golangci-lint-action@v4
with:
version: v1.55.2
version: ${{ steps.golangci-lint-version.outputs.v }}
skip-pkg-cache: true
args: --timeout=2m
23 changes: 23 additions & 0 deletions .github/workflows/validate-generated-files.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: validate-generated-files

on:
push:
branches: [ main ]
pull_request:

jobs:
validate-generated-files:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'

- name: Check generated files
run: |
export PATH=$PATH:$(go env GOPATH)/bin
make install-tools generate
git diff --exit-code --numstat
14 changes: 12 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
.PHONY: build test

VERSION=$(shell git describe --tags --dirty --always)

.PHONY: build
build:
go build -ldflags "-X 'github.com/conduitio/conduit-connector-kafka.version=${VERSION}'" -o conduit-connector-kafka cmd/connector/main.go

.PHONY: test-kafka
test-kafka:
# run required docker containers, execute integration tests, stop containers after tests
docker compose -f test/docker-compose-kafka.yml up --quiet-pull -d --wait
go test $(GOTEST_FLAGS) -race ./...; ret=$$?; \
docker compose -f test/docker-compose-kafka.yml down; \
exit $$ret

.PHONY: test-redpanda
test-redpanda:
# run required docker containers, execute integration tests, stop containers after tests
docker compose -f test/docker-compose-redpanda.yml up --quiet-pull -d --wait
go test $(GOTEST_FLAGS) -race ./...; ret=$$?; \
docker compose -f test/docker-compose-redpanda.yml down; \
exit $$ret

.PHONY: test
test: test-kafka test-redpanda

.PHONY: generate
generate:
go generate ./...

.PHONY: lint
lint:
golangci-lint run

.PHONY: install-tools
install-tools:
@echo Installing tools from tools.go
@go list -e -f '{{ join .Imports "\n" }}' tools.go | xargs -I % go list -f "%@{{.Module.Version}}" % | xargs -tI % go install %
@go mod tidy
32 changes: 16 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,22 @@ The destination connector sends records to Kafka.

There's no global, connector configuration. Each connector instance is configured separately.

| name | description | required | default value |
|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------------------|
| `servers` | Servers is a list of Kafka bootstrap servers, which will be used to discover all the servers in a cluster. | true | |
| `topic` | Topic is the Kafka topic into which records will be written. | true | |
| `clientID` | A Kafka client ID. | false | `conduit-connector-kafka` |
| `acks` | Acks defines the number of acknowledges from partition replicas required before receiving a response to a produce request. `none` = fire and forget, `one` = wait for the leader to acknowledge the writes, `all` = wait for the full ISR to acknowledge the writes. | false | `all` |
| `deliveryTimeout` | Message delivery timeout. | false | |
| `batchBytes` | Limits the maximum size of a request in bytes before being sent to a partition. This mirrors Kafka's `max.message.bytes`. | false | 1000012 |
| `compression` | Compression applied to messages. Possible values: `none`, `gzip`, `snappy`, `lz4`, `zstd`. | false | `snappy` |
| `clientCert` | A certificate for the Kafka client, in PEM format. If provided, the private key needs to be provided too. | false | |
| `clientKey` | A private key for the Kafka client, in PEM format. If provided, the certificate needs to be provided too. | false | |
| `caCert` | The Kafka broker's certificate, in PEM format. | false | |
| `insecureSkipVerify` | Controls whether a client verifies the server's certificate chain and host name. If `true`, accepts any certificate presented by the server and any host name in that certificate. | false | `false` |
| `saslMechanism` | SASL mechanism to be used. Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. If empty, authentication won't be performed. | false | |
| `saslUsername` | SASL username. If provided, a password needs to be provided too. | false | |
| `saslPassword` | SASL password. If provided, a username needs to be provided too. | false | |
| name | description | required | default value |
|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|----------------------------------------------|
| `servers` | Servers is a list of Kafka bootstrap servers, which will be used to discover all the servers in a cluster. | true | |
| `topic` | Topic is the Kafka topic. It can contain a [Go template](https://pkg.go.dev/text/template) that will be executed for each record to determine the topic. By default, the topic is the value of the `opencdc.collection` metadata field. | false | `{{ index .Metadata "opencdc.collection" }}` |
| `clientID` | A Kafka client ID. | false | `conduit-connector-kafka` |
| `acks` | Acks defines the number of acknowledges from partition replicas required before receiving a response to a produce request. `none` = fire and forget, `one` = wait for the leader to acknowledge the writes, `all` = wait for the full ISR to acknowledge the writes. | false | `all` |
| `deliveryTimeout` | Message delivery timeout. | false | |
| `batchBytes` | Limits the maximum size of a request in bytes before being sent to a partition. This mirrors Kafka's `max.message.bytes`. | false | 1000012 |
| `compression` | Compression applied to messages. Possible values: `none`, `gzip`, `snappy`, `lz4`, `zstd`. | false | `snappy` |
| `clientCert` | A certificate for the Kafka client, in PEM format. If provided, the private key needs to be provided too. | false | |
| `clientKey` | A private key for the Kafka client, in PEM format. If provided, the certificate needs to be provided too. | false | |
| `caCert` | The Kafka broker's certificate, in PEM format. | false | |
| `insecureSkipVerify` | Controls whether a client verifies the server's certificate chain and host name. If `true`, accepts any certificate presented by the server and any host name in that certificate. | false | `false` |
| `saslMechanism` | SASL mechanism to be used. Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. If empty, authentication won't be performed. | false | |
| `saslUsername` | SASL username. If provided, a password needs to be provided too. | false | |
| `saslPassword` | SASL password. If provided, a username needs to be provided too. | false | |

### Output format

Expand Down
5 changes: 2 additions & 3 deletions acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"testing"
"time"

"github.com/conduitio/conduit-connector-kafka/common"
"github.com/conduitio/conduit-connector-kafka/source"
"github.com/conduitio/conduit-connector-kafka/test"
sdk "github.com/conduitio/conduit-connector-sdk"
Expand Down Expand Up @@ -69,8 +68,8 @@ type AcceptanceTestDriver struct {
// ReadFromDestination is overwritten because the source connector uses a consumer
// group which results in slow reads. This speeds up the destination tests.
func (d AcceptanceTestDriver) ReadFromDestination(t *testing.T, records []sdk.Record) []sdk.Record {
cfg := test.ParseConfigMap[common.Config](t, d.SourceConfig(t))
kgoRecs := test.Consume(t, cfg, len(records))
cfg := test.ParseConfigMap[source.Config](t, d.SourceConfig(t))
kgoRecs := test.Consume(t, cfg.Servers, cfg.Topic, len(records))

recs := make([]sdk.Record, len(kgoRecs))
for i, rec := range kgoRecs {
Expand Down
2 changes: 0 additions & 2 deletions common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type Config struct {
// Servers is a list of Kafka bootstrap servers, which will be used to
// discover all the servers in a cluster.
Servers []string `json:"servers" validate:"required"`
// Topic is the Kafka topic.
Topic string `json:"topic" validate:"required"`

// ClientID is a unique identifier for client connections established by
// this connector.
Expand Down
4 changes: 4 additions & 0 deletions destination/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
type Config struct {
common.Config

// Topic is the Kafka topic. It can contain a [Go template](https://pkg.go.dev/text/template)
// that will be executed for each record to determine the topic. By default,
// the topic is the value of the `opencdc.collection` metadata field.
Topic string `json:"topic" default:"{{ index .Metadata \"opencdc.collection\" }}"`
// Acks defines the number of acknowledges from partition replicas required
// before receiving a response to a produce request.
// None = fire and forget, one = wait for the leader to acknowledge the
Expand Down
Loading
Loading