Skip to content

Commit

Permalink
add the ability to read from multiple Kafka topics (#131)
Browse files Browse the repository at this point in the history
* add the ability to read from multiple kafka topics

* fix test

* add multiple topic test

* rename "topic" to "topics"

* add "topic" param as (soon to be deprecated), with validations

* generate prams

* address reviews

* Update README.md

Co-authored-by: Lovro Mažgon <[email protected]>

* address reviews

* fix test

---------

Co-authored-by: Lovro Mažgon <[email protected]>
  • Loading branch information
maha-hajja and lovromazgon authored Apr 17, 2024
1 parent fb070a6 commit a545a62
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 53 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ A source is getting associated with a consumer group ID the first time the `Read
| name | description | required | default value |
|----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------------------|
| `servers` | Servers is a list of Kafka bootstrap servers, which will be used to discover all the servers in a cluster. | true | |
| `topic` | Topic is the Kafka topic from which records will be read. | true | |
| `topics` | Topics is a comma separated list of Kafka topics from which records will be read, ex: "topic1,topic2". | true | |
| ~~`topic`~~ | Topic is the Kafka topic to read from. **Deprecated: use `topics` instead.** | false | |
| `clientID` | A Kafka client ID. | false | `conduit-connector-kafka` |
| `readFromBeginning` | Determines from whence the consumer group should begin consuming when it finds a partition without a committed offset. If this option is set to true it will start with the first message in that partition. | false | `false` |
| `groupID` | Defines the consumer group ID. | false | |
Expand Down
15 changes: 10 additions & 5 deletions acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ import (
)

func TestAcceptance(t *testing.T) {
cfg := map[string]string{
srcCfg := map[string]string{
"servers": "localhost:9092",
// source params
"readFromBeginning": "true",
}
destCfg := map[string]string{
"servers": "localhost:9092",
// destination params
"batchBytes": "1000012",
"acks": "all",
Expand All @@ -40,12 +43,14 @@ func TestAcceptance(t *testing.T) {
ConfigurableAcceptanceTestDriver: sdk.ConfigurableAcceptanceTestDriver{
Config: sdk.ConfigurableAcceptanceTestDriverConfig{
Connector: Connector,
SourceConfig: cfg,
DestinationConfig: cfg,
SourceConfig: srcCfg,
DestinationConfig: destCfg,

BeforeTest: func(t *testing.T) {
lastSlash := strings.LastIndex(t.Name(), "/")
cfg["topic"] = t.Name()[lastSlash+1:] + uuid.NewString()
randomName := t.Name()[lastSlash+1:] + uuid.NewString()
srcCfg["topics"] = randomName
destCfg["topic"] = randomName
},

Skip: []string{
Expand All @@ -69,7 +74,7 @@ type AcceptanceTestDriver struct {
// group which results in slow reads. This speeds up the destination tests.
func (d AcceptanceTestDriver) ReadFromDestination(t *testing.T, records []sdk.Record) []sdk.Record {
cfg := test.ParseConfigMap[source.Config](t, d.SourceConfig(t))
kgoRecs := test.Consume(t, cfg.Servers, cfg.Topic, len(records))
kgoRecs := test.Consume(t, cfg.Servers, cfg.Topics[0], len(records))

recs := make([]sdk.Record, len(kgoRecs))
for i, rec := range kgoRecs {
Expand Down
11 changes: 8 additions & 3 deletions destination_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ import (
"context"
"testing"

"github.com/conduitio/conduit-connector-kafka/destination"
"github.com/conduitio/conduit-connector-kafka/source"
"github.com/conduitio/conduit-connector-kafka/test"
"github.com/matryer/is"
)

func TestDestination_Integration_WriteExistingTopic(t *testing.T) {
cfgMap := test.DestinationConfigMap(t)
cfg := test.ParseConfigMap[source.Config](t, cfgMap)
cfg := test.ParseConfigMap[destination.Config](t, cfgMap)

test.CreateTopic(t, cfg.Servers, cfg.Topic)
test.CreateTopics(t, cfg.Servers, []string{cfg.Topic})
testDestinationIntegrationWrite(t, cfgMap)
}

Expand Down Expand Up @@ -58,8 +59,12 @@ func testDestinationIntegrationWrite(t *testing.T, cfg map[string]string) {
is.NoErr(err)
is.Equal(count, len(wantRecords))

// source config needs "topics" param
cfg["topics"] = cfg["topic"]
cfg["topic"] = ""

srcCfg := test.ParseConfigMap[source.Config](t, cfg)
gotRecords := test.Consume(t, srcCfg.Servers, srcCfg.Topic, len(wantRecords))
gotRecords := test.Consume(t, srcCfg.Servers, srcCfg.Topics[0], len(wantRecords))
is.Equal(len(wantRecords), len(gotRecords))
for i, got := range gotRecords {
is.Equal(got.Value, wantRecords[i].Bytes())
Expand Down
4 changes: 2 additions & 2 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ func (s *Source) Parameters() map[string]sdk.Parameter {
return source.Config{}.Parameters()
}

func (s *Source) Configure(_ context.Context, cfg map[string]string) error {
func (s *Source) Configure(ctx context.Context, cfg map[string]string) error {
var config source.Config

err := sdk.Util.ParseConfig(cfg, &config)
if err != nil {
return err
}
err = config.Validate()
err = config.Validate(ctx)
if err != nil {
return err
}
Expand Down
33 changes: 29 additions & 4 deletions source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@
package source

import (
"context"
"errors"
"fmt"

"github.com/conduitio/conduit-connector-kafka/common"
sdk "github.com/conduitio/conduit-connector-sdk"
)

type Config struct {
common.Config

// Topic is the Kafka topic.
Topic string `json:"topic" validate:"required"`
// Topics is a comma separated list of Kafka topics to read from.
Topics []string `json:"topics"`
// Topic {WARN will be deprecated soon} the kafka topic to read from.
Topic string `json:"topic"`
// ReadFromBeginning determines from whence the consumer group should begin
// consuming when it finds a partition without a committed offset. If this
// options is set to true it will start with the first message in that
Expand All @@ -35,6 +42,24 @@ type Config struct {
}

// Validate executes manual validations beyond what is defined in struct tags.
func (c Config) Validate() error {
return c.Config.Validate()
func (c *Config) Validate(ctx context.Context) error {
var multierr []error
err := c.Config.Validate()
if err != nil {
multierr = append(multierr, err)
}
// validate and set the topics.
if len(c.Topic) == 0 && len(c.Topics) == 0 {
multierr = append(multierr, fmt.Errorf("required parameter missing: %q", "topics"))
}
if len(c.Topic) > 0 && len(c.Topics) > 0 {
multierr = append(multierr, fmt.Errorf(`can't provide both "topic" and "topics" parameters, "topic" is deprecated and will be removed, use the "topics" parameter instead`))
}
if len(c.Topic) > 0 && len(c.Topics) == 0 {
sdk.Logger(ctx).Warn().Msg(`"topic" parameter is deprecated and will be removed, please use "topics" instead.`)
// add the topic value to the topics slice.
c.Topics = make([]string, 1)
c.Topics[0] = c.Topic
}
return errors.Join(multierr...)
}
77 changes: 77 additions & 0 deletions source/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package source

import (
"context"
"fmt"
"strings"
"testing"

"github.com/matryer/is"
)

func TestConfig_ValidateTopics(t *testing.T) {
// Note that we are testing custom validations. Required fields and simple
// validations are already executed by the SDK via parameter specifications.
testCases := []struct {
name string
cfg Config
wantErr string
}{{
name: `one of "topic" and "topics" should be provided.`,
cfg: Config{
Topics: []string{},
Topic: "",
},
wantErr: `required parameter missing: "topics"`,
}, {
name: "invalid, only provide one.",
cfg: Config{
Topics: []string{"topic2"},
Topic: "topic1",
},
wantErr: `can't provide both "topic" and "topics" parameters, "topic" is deprecated and will be removed, use the "topics" parameter instead`,
}, {
name: "valid with warning, will be deprecated soon",
cfg: Config{
Topics: []string{},
Topic: "topic1",
},
wantErr: "",
}, {
name: "valid",
cfg: Config{
Topics: []string{"topic1"},
},
wantErr: "",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
err := tc.cfg.Validate(context.Background())
fmt.Println(err)
if tc.wantErr != "" {
is.True(err != nil)
is.True(strings.Contains(err.Error(), tc.wantErr))
} else {
is.NoErr(err)
is.Equal(tc.cfg.Topics, []string{"topic1"})
}
})
}
}
2 changes: 1 addition & 1 deletion source/franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewFranzConsumer(ctx context.Context, cfg Config) (*FranzConsumer, error) {
opts := cfg.FranzClientOpts(sdk.Logger(ctx))
opts = append(opts, []kgo.Opt{
kgo.ConsumerGroup(cfg.GroupID),
kgo.ConsumeTopics(cfg.Topic),
kgo.ConsumeTopics(cfg.Topics...),
}...)

if !cfg.ReadFromBeginning {
Expand Down
51 changes: 44 additions & 7 deletions source/franz_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ func TestFranzConsumer_Consume_FromBeginning(t *testing.T) {
is := is.New(t)
ctx := context.Background()

cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t))
cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false))
cfg.ReadFromBeginning = true

records := test.GenerateFranzRecords(1, 6)
test.CreateTopic(t, cfg.Servers, cfg.Topic)
test.Produce(t, cfg.Servers, cfg.Topic, records)
test.CreateTopics(t, cfg.Servers, cfg.Topics)
test.Produce(t, cfg.Servers, cfg.Topics[0], records)

c, err := NewFranzConsumer(ctx, cfg)
is.NoErr(err)
Expand All @@ -56,12 +56,12 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) {
is := is.New(t)
ctx := context.Background()

cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t))
cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false))
cfg.ReadFromBeginning = false

records := test.GenerateFranzRecords(1, 6)
test.CreateTopic(t, cfg.Servers, cfg.Topic)
test.Produce(t, cfg.Servers, cfg.Topic, records)
test.CreateTopics(t, cfg.Servers, cfg.Topics)
test.Produce(t, cfg.Servers, cfg.Topics[0], records)

c, err := NewFranzConsumer(ctx, cfg)
is.NoErr(err)
Expand All @@ -77,7 +77,7 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) {
is.Equal(got, nil)

records = test.GenerateFranzRecords(7, 9)
test.Produce(t, cfg.Servers, cfg.Topic, records)
test.Produce(t, cfg.Servers, cfg.Topics[0], records)

for i := 0; i < len(records); i++ {
ctx, cancel := context.WithTimeout(ctx, time.Second)
Expand All @@ -87,3 +87,40 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) {
is.Equal(got.Key, records[i].Key)
}
}

func TestFranzConsumer_Consume_MultipleTopics(t *testing.T) {
t.Parallel()
is := is.New(t)
ctx := context.Background()

cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true))
cfg.ReadFromBeginning = true

records := test.GenerateFranzRecords(1, 6)
test.CreateTopics(t, cfg.Servers, cfg.Topics)
test.Produce(t, cfg.Servers, cfg.Topics[0], records[0:3])
test.Produce(t, cfg.Servers, cfg.Topics[1], records[3:])

c, err := NewFranzConsumer(ctx, cfg)
is.NoErr(err)
defer func() {
err := c.Close(ctx)
is.NoErr(err)
}()

topic1 := 0
topic2 := 0
for i := 0; i < len(records); i++ {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
got, err := c.Consume(ctx)
is.NoErr(err)
if got.Topic == cfg.Topics[0] {
topic1++
} else if got.Topic == cfg.Topics[1] {
topic2++
}
}
is.Equal(topic1, 3)
is.Equal(topic2, 3)
}
4 changes: 2 additions & 2 deletions source/franz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ func TestFranzConsumer_Opts(t *testing.T) {
CACert: caCert,
},
},
Topic: "test-topic",
Topics: []string{"test-topic"},
GroupID: "test-group-id",
}

c, err := NewFranzConsumer(context.Background(), cfg)
is.NoErr(err)

is.Equal(c.client.OptValue(kgo.ConsumeTopics), map[string]*regexp.Regexp{cfg.Topic: nil})
is.Equal(c.client.OptValue(kgo.ConsumeTopics), map[string]*regexp.Regexp{cfg.Topics[0]: nil})
is.Equal(c.client.OptValue(kgo.ConsumerGroup), cfg.GroupID)

is.Equal(c.client.OptValue(kgo.ClientID), cfg.ClientID)
Expand Down
12 changes: 8 additions & 4 deletions source/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a545a62

Please sign in to comment.