From b3055a76772443b8e4a5d9e3d6b4229c954116d7 Mon Sep 17 00:00:00 2001 From: Samir Ketema Date: Wed, 10 Jul 2024 19:37:06 -0700 Subject: [PATCH] retry on franz consumer group session errors --- Makefile | 5 +++ README.md | 2 ++ common/config.go | 7 ++-- source.go | 20 ++++++++++++ source/config.go | 4 +++ source/config_test.go | 55 ++++++++++++++++---------------- source/franz_integration_test.go | 6 ++-- source/paramgen.go | 12 +++++++ source_integration_test.go | 4 +-- source_test.go | 29 +++++++++++++++-- test/util.go | 4 ++- 11 files changed, 109 insertions(+), 39 deletions(-) diff --git a/Makefile b/Makefile index b609324..fdaa3e4 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,11 @@ test: test-kafka test-redpanda generate: go generate ./... +.PHONY: fmt +fmt: ## Format Go files using gofumpt and gci. + gofumpt -l -w . + gci write --skip-generated . + .PHONY: lint lint: golangci-lint run diff --git a/README.md b/README.md index 4364bf1..109f08a 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,8 @@ A source is getting associated with a consumer group ID the first time the `Read | `saslMechanism` | SASL mechanism to be used. Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. If empty, authentication won't be performed. | false | | | `saslUsername` | SASL username. If provided, a password needs to be provided too. | false | | | `saslPassword` | SASL password. If provided, a username needs to be provided too. | false | | +| `RetryGroupJoinErrors` | determines whether the connector will continually retry on group join errors | false | true | +| `RetryLeaderErrors` | determines whether the connector will continually retry on leader errors | false | true | ## Destination diff --git a/common/config.go b/common/config.go index e3348fa..da729dc 100644 --- a/common/config.go +++ b/common/config.go @@ -23,10 +23,8 @@ import ( "github.com/twmb/franz-go/pkg/kgo" ) -var ( - // TODO make the timeout configurable - connectionTimeout = time.Second * 10 -) +// TODO make the timeout configurable +var connectionTimeout = time.Second * 10 // Config contains common configuration parameters. type Config struct { @@ -78,6 +76,7 @@ func (c Config) TryDial(ctx context.Context) error { case <-ctx.Done(): return err case <-time.After(time.Second): + sdk.Logger(ctx).Info().Msg("failed to dial broker, trying again...") // ping again } } diff --git a/source.go b/source.go index b364fda..9f4867c 100644 --- a/source.go +++ b/source.go @@ -16,13 +16,17 @@ package kafka import ( "context" + "errors" "fmt" + "strings" "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/source" sdk "github.com/conduitio/conduit-connector-sdk" "github.com/google/uuid" + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kgo" ) const ( @@ -90,6 +94,22 @@ func (s *Source) Open(ctx context.Context, sdkPos opencdc.Position) error { func (s *Source) Read(ctx context.Context) (opencdc.Record, error) { rec, err := s.consumer.Consume(ctx) if err != nil { + var errGroupSession *kgo.ErrGroupSession + if s.config.RetryGroupJoinErrors && + (errors.As(err, &errGroupSession) || strings.Contains(err.Error(), "unable to join group session")) { + sdk.Logger(ctx).Error().Msgf("group session error, retrying: %s", err.Error()) + return opencdc.Record{}, sdk.ErrBackoffRetry + } + if s.config.RetryLeaderErrors { + switch err { + case kerr.LeaderNotAvailable, + kerr.EligibleLeadersNotAvailable, + kerr.PreferredLeaderNotAvailable, + kerr.BrokerNotAvailable: + sdk.Logger(ctx).Error().Msgf("ephemeral error connecting to broker or leader, retrying: %s", err.Error()) + return opencdc.Record{}, sdk.ErrBackoffRetry + } + } return opencdc.Record{}, fmt.Errorf("failed getting a record: %w", err) } diff --git a/source/config.go b/source/config.go index 2973e18..2579c1e 100644 --- a/source/config.go +++ b/source/config.go @@ -39,6 +39,10 @@ type Config struct { ReadFromBeginning bool `json:"readFromBeginning"` // GroupID defines the consumer group id. GroupID string `json:"groupID"` + // RetryGroupJoinErrors determines whether the connector will continually retry on group join errors. + RetryGroupJoinErrors bool `json:"retryGroupJoinErrors" default:"true"` + // RetryLeaderErrors determines whether the connector will continually retry on leader errors. + RetryLeaderErrors bool `json:"retryLeaderErrors" default:"true"` } // Validate executes manual validations beyond what is defined in struct tags. diff --git a/source/config_test.go b/source/config_test.go index 45b8635..983c578 100644 --- a/source/config_test.go +++ b/source/config_test.go @@ -30,34 +30,35 @@ func TestConfig_ValidateTopics(t *testing.T) { name string cfg Config wantErr string - }{{ - name: `one of "topic" and "topics" should be provided.`, - cfg: Config{ - Topics: []string{}, - Topic: "", + }{ + { + name: `one of "topic" and "topics" should be provided.`, + cfg: Config{ + Topics: []string{}, + Topic: "", + }, + wantErr: `required parameter missing: "topics"`, + }, { + name: "invalid, only provide one.", + cfg: Config{ + Topics: []string{"topic2"}, + Topic: "topic1", + }, + wantErr: `can't provide both "topic" and "topics" parameters, "topic" is deprecated and will be removed, use the "topics" parameter instead`, + }, { + name: "valid with warning, will be deprecated soon", + cfg: Config{ + Topics: []string{}, + Topic: "topic1", + }, + wantErr: "", + }, { + name: "valid", + cfg: Config{ + Topics: []string{"topic1"}, + }, + wantErr: "", }, - wantErr: `required parameter missing: "topics"`, - }, { - name: "invalid, only provide one.", - cfg: Config{ - Topics: []string{"topic2"}, - Topic: "topic1", - }, - wantErr: `can't provide both "topic" and "topics" parameters, "topic" is deprecated and will be removed, use the "topics" parameter instead`, - }, { - name: "valid with warning, will be deprecated soon", - cfg: Config{ - Topics: []string{}, - Topic: "topic1", - }, - wantErr: "", - }, { - name: "valid", - cfg: Config{ - Topics: []string{"topic1"}, - }, - wantErr: "", - }, } for _, tc := range testCases { diff --git a/source/franz_integration_test.go b/source/franz_integration_test.go index e055331..a5bcb51 100644 --- a/source/franz_integration_test.go +++ b/source/franz_integration_test.go @@ -28,7 +28,7 @@ func TestFranzConsumer_Consume_FromBeginning(t *testing.T) { is := is.New(t) ctx := context.Background() - cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false)) + cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false, false, false)) cfg.ReadFromBeginning = true records := test.GenerateFranzRecords(1, 6) @@ -56,7 +56,7 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) { is := is.New(t) ctx := context.Background() - cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false)) + cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false, false, false)) cfg.ReadFromBeginning = false records := test.GenerateFranzRecords(1, 6) @@ -93,7 +93,7 @@ func TestFranzConsumer_Consume_MultipleTopics(t *testing.T) { is := is.New(t) ctx := context.Background() - cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true)) + cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true, false, false)) cfg.ReadFromBeginning = true records := test.GenerateFranzRecords(1, 6) diff --git a/source/paramgen.go b/source/paramgen.go index 44766a8..c0f922a 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -51,6 +51,18 @@ func (Config) Parameters() map[string]config.Parameter { Type: config.ParameterTypeBool, Validations: []config.Validation{}, }, + "retryGroupJoinErrors": { + Default: "true", + Description: "RetryGroupJoinErrors determines whether the connector will continually retry on group join errors.", + Type: config.ParameterTypeBool, + Validations: []config.Validation{}, + }, + "retryLeaderErrors": { + Default: "true", + Description: "RetryLeaderErrors determines whether the connector will continually retry on leader errors.", + Type: config.ParameterTypeBool, + Validations: []config.Validation{}, + }, "saslMechanism": { Default: "", Description: "Mechanism configures the connector to use SASL authentication. If\nempty, no authentication will be performed.", diff --git a/source_integration_test.go b/source_integration_test.go index b4c0a93..4aceadb 100644 --- a/source_integration_test.go +++ b/source_integration_test.go @@ -28,7 +28,7 @@ import ( func TestSource_Integration_RestartFull(t *testing.T) { t.Parallel() - cfgMap := test.SourceConfigMap(t, true) + cfgMap := test.SourceConfigMap(t, true, false, false) cfg := test.ParseConfigMap[source.Config](t, cfgMap) recs1 := test.GenerateFranzRecords(1, 3) @@ -44,7 +44,7 @@ func TestSource_Integration_RestartFull(t *testing.T) { func TestSource_Integration_RestartPartial(t *testing.T) { t.Parallel() - cfgMap := test.SourceConfigMap(t, true) + cfgMap := test.SourceConfigMap(t, true, false, false) cfg := test.ParseConfigMap[source.Config](t, cfgMap) recs1 := test.GenerateFranzRecords(1, 3) diff --git a/source_test.go b/source_test.go index 114516e..d3409b1 100644 --- a/source_test.go +++ b/source_test.go @@ -16,12 +16,14 @@ package kafka import ( "context" + "errors" "strconv" "testing" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/source" "github.com/conduitio/conduit-connector-kafka/test" + sdk "github.com/conduitio/conduit-connector-sdk" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/matryer/is" @@ -39,7 +41,7 @@ func TestSource_Teardown_Success(t *testing.T) { Close(context.Background()). Return(nil) - cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, true)) + cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, true, false, false)) underTest := Source{consumer: consumerMock, config: cfg} is.NoErr(underTest.Teardown(context.Background())) @@ -86,7 +88,7 @@ func TestSource_Read(t *testing.T) { Consume(gomock.Any()). Return((*source.Record)(rec), nil) - cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, false)) + cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, false, false, false)) underTest := Source{consumer: consumerMock, config: cfg} got, err := underTest.Read(context.Background()) is.NoErr(err) @@ -94,3 +96,26 @@ func TestSource_Read(t *testing.T) { want.Metadata[opencdc.MetadataReadAt] = got.Metadata[opencdc.MetadataReadAt] is.Equal(cmp.Diff(want, got, cmpopts.IgnoreUnexported(opencdc.Record{})), "") } + +func TestSource_Read_RetryJoinError(t *testing.T) { + is := is.New(t) + ctrl := gomock.NewController(t) + + rec := test.GenerateFranzRecords(0, 0, "foo")[0] + rec.Headers = []kgo.RecordHeader{ + {Key: "header-a", Value: []byte("value-a")}, + {Key: "header-b", Value: []byte{0, 1, 2}}, + } + + consumerMock := source.NewMockConsumer(ctrl) + consumerMock. + EXPECT(). + Consume(gomock.Any()). + Return(nil, errors.New("unable to join group session: unable to dial: dial tcp 127.0.0.1:9092: connect: connection refused")) + + cfg := test.ParseConfigMap[source.Config](t, test.SourceConfigMap(t, false, true, true)) + underTest := Source{consumer: consumerMock, config: cfg} + _, err := underTest.Read(context.Background()) + is.True(err != nil) + is.True(errors.Is(err, sdk.ErrBackoffRetry)) +} diff --git a/test/util.go b/test/util.go index 0de6b6d..fde8d9d 100644 --- a/test/util.go +++ b/test/util.go @@ -69,9 +69,11 @@ func ConfigMap() map[string]string { } } -func SourceConfigMap(t T, multipleTopics bool) map[string]string { +func SourceConfigMap(t T, multipleTopics, retryGroupJoinErrors, retryLeaderErrors bool) map[string]string { m := ConfigMap() m["readFromBeginning"] = "true" + m["retryGroupJoinErrors"] = fmt.Sprint(retryGroupJoinErrors) + m["retryLeaderErrors"] = fmt.Sprint(retryLeaderErrors) m["topics"] = getRandomTopicName(t) if multipleTopics { m["topics"] = m["topics"] + "," + getRandomTopicName(t)