Skip to content

Commit

Permalink
retry on franz consumer group session errors
Browse files Browse the repository at this point in the history
  • Loading branch information
samirketema committed Jul 11, 2024
1 parent 87864b6 commit b3055a7
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 39 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ test: test-kafka test-redpanda
generate:
go generate ./...

.PHONY: fmt
fmt: ## Format Go files using gofumpt and gci.
gofumpt -l -w .
gci write --skip-generated .

.PHONY: lint
lint:
golangci-lint run
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ A source is getting associated with a consumer group ID the first time the `Read
| `saslMechanism` | SASL mechanism to be used. Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. If empty, authentication won't be performed. | false | |
| `saslUsername` | SASL username. If provided, a password needs to be provided too. | false | |
| `saslPassword` | SASL password. If provided, a username needs to be provided too. | false | |
| `RetryGroupJoinErrors` | determines whether the connector will continually retry on group join errors | false | true |
| `RetryLeaderErrors` | determines whether the connector will continually retry on leader errors | false | true |

## Destination

Expand Down
7 changes: 3 additions & 4 deletions common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
)

var (
// TODO make the timeout configurable
connectionTimeout = time.Second * 10
)
// TODO make the timeout configurable
var connectionTimeout = time.Second * 10

// Config contains common configuration parameters.
type Config struct {
Expand Down Expand Up @@ -78,6 +76,7 @@ func (c Config) TryDial(ctx context.Context) error {
case <-ctx.Done():
return err
case <-time.After(time.Second):
sdk.Logger(ctx).Info().Msg("failed to dial broker, trying again...")
// ping again
}
}
Expand Down
20 changes: 20 additions & 0 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ package kafka

import (
"context"
"errors"
"fmt"
"strings"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-kafka/source"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/uuid"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
)

const (
Expand Down Expand Up @@ -90,6 +94,22 @@ func (s *Source) Open(ctx context.Context, sdkPos opencdc.Position) error {
func (s *Source) Read(ctx context.Context) (opencdc.Record, error) {
rec, err := s.consumer.Consume(ctx)
if err != nil {
var errGroupSession *kgo.ErrGroupSession
if s.config.RetryGroupJoinErrors &&
(errors.As(err, &errGroupSession) || strings.Contains(err.Error(), "unable to join group session")) {
sdk.Logger(ctx).Error().Msgf("group session error, retrying: %s", err.Error())
return opencdc.Record{}, sdk.ErrBackoffRetry
}
if s.config.RetryLeaderErrors {
switch err {
case kerr.LeaderNotAvailable,
kerr.EligibleLeadersNotAvailable,
kerr.PreferredLeaderNotAvailable,
kerr.BrokerNotAvailable:
sdk.Logger(ctx).Error().Msgf("ephemeral error connecting to broker or leader, retrying: %s", err.Error())
return opencdc.Record{}, sdk.ErrBackoffRetry
}
}
return opencdc.Record{}, fmt.Errorf("failed getting a record: %w", err)
}

Expand Down
4 changes: 4 additions & 0 deletions source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type Config struct {
ReadFromBeginning bool `json:"readFromBeginning"`
// GroupID defines the consumer group id.
GroupID string `json:"groupID"`
// RetryGroupJoinErrors determines whether the connector will continually retry on group join errors.
RetryGroupJoinErrors bool `json:"retryGroupJoinErrors" default:"true"`
// RetryLeaderErrors determines whether the connector will continually retry on leader errors.
RetryLeaderErrors bool `json:"retryLeaderErrors" default:"true"`
}

// Validate executes manual validations beyond what is defined in struct tags.
Expand Down
55 changes: 28 additions & 27 deletions source/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,35 @@ func TestConfig_ValidateTopics(t *testing.T) {
name string
cfg Config
wantErr string
}{{
name: `one of "topic" and "topics" should be provided.`,
cfg: Config{
Topics: []string{},
Topic: "",
}{
{
name: `one of "topic" and "topics" should be provided.`,
cfg: Config{
Topics: []string{},
Topic: "",
},
wantErr: `required parameter missing: "topics"`,
}, {
name: "invalid, only provide one.",
cfg: Config{
Topics: []string{"topic2"},
Topic: "topic1",
},
wantErr: `can't provide both "topic" and "topics" parameters, "topic" is deprecated and will be removed, use the "topics" parameter instead`,
}, {
name: "valid with warning, will be deprecated soon",
cfg: Config{
Topics: []string{},
Topic: "topic1",
},
wantErr: "",
}, {
name: "valid",
cfg: Config{
Topics: []string{"topic1"},
},
wantErr: "",
},
wantErr: `required parameter missing: "topics"`,
}, {
name: "invalid, only provide one.",
cfg: Config{
Topics: []string{"topic2"},
Topic: "topic1",
},
wantErr: `can't provide both "topic" and "topics" parameters, "topic" is deprecated and will be removed, use the "topics" parameter instead`,
}, {
name: "valid with warning, will be deprecated soon",
cfg: Config{
Topics: []string{},
Topic: "topic1",
},
wantErr: "",
}, {
name: "valid",
cfg: Config{
Topics: []string{"topic1"},
},
wantErr: "",
},
}

for _, tc := range testCases {
Expand Down
6 changes: 3 additions & 3 deletions source/franz_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestFranzConsumer_Consume_FromBeginning(t *testing.T) {
is := is.New(t)
ctx := context.Background()

cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false))
cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false, false, false))
cfg.ReadFromBeginning = true

records := test.GenerateFranzRecords(1, 6)
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) {
is := is.New(t)
ctx := context.Background()

cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false))
cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false, false, false))
cfg.ReadFromBeginning = false

records := test.GenerateFranzRecords(1, 6)
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestFranzConsumer_Consume_MultipleTopics(t *testing.T) {
is := is.New(t)
ctx := context.Background()

cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true))
cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true, false, false))
cfg.ReadFromBeginning = true

records := test.GenerateFranzRecords(1, 6)
Expand Down
12 changes: 12 additions & 0 deletions source/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions source_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
func TestSource_Integration_RestartFull(t *testing.T) {
t.Parallel()

cfgMap := test.SourceConfigMap(t, true)
cfgMap := test.SourceConfigMap(t, true, false, false)
cfg := test.ParseConfigMap[source.Config](t, cfgMap)

recs1 := test.GenerateFranzRecords(1, 3)
Expand All @@ -44,7 +44,7 @@ func TestSource_Integration_RestartFull(t *testing.T) {
func TestSource_Integration_RestartPartial(t *testing.T) {
t.Parallel()

cfgMap := test.SourceConfigMap(t, true)
cfgMap := test.SourceConfigMap(t, true, false, false)
cfg := test.ParseConfigMap[source.Config](t, cfgMap)

recs1 := test.GenerateFranzRecords(1, 3)
Expand Down
29 changes: 27 additions & 2 deletions source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package kafka

import (
"context"
"errors"
"strconv"
"testing"

"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-kafka/source"
"github.com/conduitio/conduit-connector-kafka/test"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/matryer/is"
Expand All @@ -39,7 +41,7 @@ func TestSource_Teardown_Success(t *testing.T) {
Close(context.Background()).
Return(nil)

cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, true))
cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, true, false, false))

underTest := Source{consumer: consumerMock, config: cfg}
is.NoErr(underTest.Teardown(context.Background()))
Expand Down Expand Up @@ -86,11 +88,34 @@ func TestSource_Read(t *testing.T) {
Consume(gomock.Any()).
Return((*source.Record)(rec), nil)

cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, false))
cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, false, false, false))
underTest := Source{consumer: consumerMock, config: cfg}
got, err := underTest.Read(context.Background())
is.NoErr(err)
is.True(got.Metadata[opencdc.MetadataReadAt] != "")
want.Metadata[opencdc.MetadataReadAt] = got.Metadata[opencdc.MetadataReadAt]
is.Equal(cmp.Diff(want, got, cmpopts.IgnoreUnexported(opencdc.Record{})), "")
}

func TestSource_Read_RetryJoinError(t *testing.T) {
is := is.New(t)
ctrl := gomock.NewController(t)

rec := test.GenerateFranzRecords(0, 0, "foo")[0]
rec.Headers = []kgo.RecordHeader{
{Key: "header-a", Value: []byte("value-a")},
{Key: "header-b", Value: []byte{0, 1, 2}},
}

consumerMock := source.NewMockConsumer(ctrl)
consumerMock.
EXPECT().
Consume(gomock.Any()).
Return(nil, errors.New("unable to join group session: unable to dial: dial tcp 127.0.0.1:9092: connect: connection refused"))

cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, false, true, true))
underTest := Source{consumer: consumerMock, config: cfg}
_, err := underTest.Read(context.Background())
is.True(err != nil)
is.True(errors.Is(err, sdk.ErrBackoffRetry))
}
4 changes: 3 additions & 1 deletion test/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ func ConfigMap() map[string]string {
}
}

func SourceConfigMap(t T, multipleTopics bool) map[string]string {
func SourceConfigMap(t T, multipleTopics, retryGroupJoinErrors, retryLeaderErrors bool) map[string]string {
m := ConfigMap()
m["readFromBeginning"] = "true"
m["retryGroupJoinErrors"] = fmt.Sprint(retryGroupJoinErrors)
m["retryLeaderErrors"] = fmt.Sprint(retryLeaderErrors)
m["topics"] = getRandomTopicName(t)
if multipleTopics {
m["topics"] = m["topics"] + "," + getRandomTopicName(t)
Expand Down

0 comments on commit b3055a7

Please sign in to comment.