Skip to content

Commit

Permalink
increase rate of metadata fetching in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Nov 2, 2023
1 parent 7761c1a commit 21c1ede
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 0 deletions.
4 changes: 4 additions & 0 deletions acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kafka
import (
"strings"
"testing"
"time"

"github.com/conduitio/conduit-connector-kafka/common"
"github.com/conduitio/conduit-connector-kafka/source"
Expand Down Expand Up @@ -53,6 +54,9 @@ func TestAcceptance(t *testing.T) {
"TestSource_Configure_RequiredParams",
"TestDestination_Configure_RequiredParams",
},

WriteTimeout: time.Second * 10,
ReadTimeout: time.Second * 10,
},
},
})
Expand Down
2 changes: 2 additions & 0 deletions common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Config struct {

ConfigSASL
ConfigTLS

franzClientOpts []kgo.Opt
}

// Validate executes manual validations beyond what is defined in struct tags.
Expand Down
8 changes: 8 additions & 0 deletions common/franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
)

// WithFranzClientOpts lets you specify custom kafka client options (meant for
// test purposes).
func (c Config) WithFranzClientOpts(opts ...kgo.Opt) Config {
c.franzClientOpts = append(c.franzClientOpts, opts...)
return c
}

// FranzClientOpts returns the kafka client options derived from the common config.
func (c Config) FranzClientOpts(logger *zerolog.Logger) []kgo.Opt {
opts := []kgo.Opt{
kgo.SeedBrokers(c.Servers...),
kgo.ClientID(c.ClientID),
}
opts = append(opts, c.franzClientOpts...)
if logger.GetLevel() != zerolog.Disabled {
opts = append(opts, kgo.WithLogger(franzLogger{logger: logger}))
}
Expand Down
1 change: 1 addition & 0 deletions destination/franz_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestFranzProducer_Produce(t *testing.T) {
ctx := context.Background()

cfg := test.ParseConfigMap[Config](t, test.DestinationConfigMap(t))
cfg.Config = test.ConfigWithIntegrationTestOptions(cfg.Config)

p, err := NewFranzProducer(ctx, cfg)
is.NoErr(err)
Expand Down
11 changes: 11 additions & 0 deletions test/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func Consume(t T, cfg common.Config, limit int) []*kgo.Record {
cl, err := kgo.NewClient(
kgo.SeedBrokers(cfg.Servers...),
kgo.ConsumeTopics(cfg.Topic),
kgo.MetadataMinAge(time.Millisecond*100),
)
is.NoErr(err)
defer cl.Close()
Expand Down Expand Up @@ -129,6 +130,7 @@ func Produce(t T, cfg common.Config, records []*kgo.Record, timeoutOpt ...time.D
cl, err := kgo.NewClient(
kgo.SeedBrokers(cfg.Servers...),
kgo.DefaultProduceTopic(cfg.Topic),
kgo.MetadataMinAge(time.Millisecond*100),
)
is.NoErr(err)
defer cl.Close()
Expand Down Expand Up @@ -173,6 +175,7 @@ func CreateTopic(t T, cfg common.Config) {

cl, err := kgo.NewClient(
kgo.SeedBrokers(cfg.Servers...),
kgo.MetadataMinAge(time.Millisecond*100),
)
is.NoErr(err)

Expand Down Expand Up @@ -220,3 +223,11 @@ func Certificates(t T) (clientCert, clientKey, caCert string) {
caCert = readFile("server.cer.pem")
return
}

func ConfigWithIntegrationTestOptions(cfg common.Config) common.Config {
return cfg.WithFranzClientOpts(
// by default metadata is fetched every 5 seconds, for integration tests
// we set this to a lower value so the tests finish faster
kgo.MetadataMinAge(time.Millisecond * 100),
)
}

0 comments on commit 21c1ede

Please sign in to comment.