Skip to content

Commit

Permalink
retry on franz consumer group session errors (#154)
Browse files Browse the repository at this point in the history
* retry on franz consumer group session errors

* address feedback

* Update README.md

Co-authored-by: Lovro Mažgon <[email protected]>

* remove unused retry leader error config

* remove retryLeaderErrors from tests

* Update source/franz.go

Co-authored-by: Lovro Mažgon <[email protected]>

* used -typed flag in franz client mock

---------

Co-authored-by: Lovro Mažgon <[email protected]>
  • Loading branch information
samirketema and lovromazgon authored Jul 12, 2024
1 parent 87864b6 commit 3eb3b3a
Show file tree
Hide file tree
Showing 13 changed files with 351 additions and 45 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ test: test-kafka test-redpanda
generate:
go generate ./...

.PHONY: fmt
fmt: ## Format Go files using gofumpt and gci.
gofumpt -l -w .
gci write --skip-generated .

.PHONY: lint
lint:
golangci-lint run
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ A source is getting associated with a consumer group ID the first time the `Read
| `saslMechanism` | SASL mechanism to be used. Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. If empty, authentication won't be performed. | false | |
| `saslUsername` | SASL username. If provided, a password needs to be provided too. | false | |
| `saslPassword` | SASL password. If provided, a username needs to be provided too. | false | |
| `retryGroupJoinErrors` | determines whether the connector will continually retry on group join errors | false | `true` |

## Destination

Expand Down
7 changes: 3 additions & 4 deletions common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
)

var (
// TODO make the timeout configurable
connectionTimeout = time.Second * 10
)
// TODO make the timeout configurable
var connectionTimeout = time.Second * 10

// Config contains common configuration parameters.
type Config struct {
Expand Down Expand Up @@ -78,6 +76,7 @@ func (c Config) TryDial(ctx context.Context) error {
case <-ctx.Done():
return err
case <-time.After(time.Second):
sdk.Logger(ctx).Warn().Msg("failed to dial broker, trying again...")
// ping again
}
}
Expand Down
2 changes: 2 additions & 0 deletions source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Config struct {
ReadFromBeginning bool `json:"readFromBeginning"`
// GroupID defines the consumer group id.
GroupID string `json:"groupID"`
// RetryGroupJoinErrors determines whether the connector will continually retry on group join errors.
RetryGroupJoinErrors bool `json:"retryGroupJoinErrors" default:"true"`
}

// Validate executes manual validations beyond what is defined in struct tags.
Expand Down
55 changes: 28 additions & 27 deletions source/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,35 @@ func TestConfig_ValidateTopics(t *testing.T) {
name string
cfg Config
wantErr string
}{{
name: `one of "topic" and "topics" should be provided.`,
cfg: Config{
Topics: []string{},
Topic: "",
}{
{
name: `one of "topic" and "topics" should be provided.`,
cfg: Config{
Topics: []string{},
Topic: "",
},
wantErr: `required parameter missing: "topics"`,
}, {
name: "invalid, only provide one.",
cfg: Config{
Topics: []string{"topic2"},
Topic: "topic1",
},
wantErr: `can't provide both "topic" and "topics" parameters, "topic" is deprecated and will be removed, use the "topics" parameter instead`,
}, {
name: "valid with warning, will be deprecated soon",
cfg: Config{
Topics: []string{},
Topic: "topic1",
},
wantErr: "",
}, {
name: "valid",
cfg: Config{
Topics: []string{"topic1"},
},
wantErr: "",
},
wantErr: `required parameter missing: "topics"`,
}, {
name: "invalid, only provide one.",
cfg: Config{
Topics: []string{"topic2"},
Topic: "topic1",
},
wantErr: `can't provide both "topic" and "topics" parameters, "topic" is deprecated and will be removed, use the "topics" parameter instead`,
}, {
name: "valid with warning, will be deprecated soon",
cfg: Config{
Topics: []string{},
Topic: "topic1",
},
wantErr: "",
}, {
name: "valid",
cfg: Config{
Topics: []string{"topic1"},
},
wantErr: "",
},
}

for _, tc := range testCases {
Expand Down
33 changes: 27 additions & 6 deletions source/franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,36 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate mockgen -typed -destination franz_mock.go -package source -mock_names=Client=MockClient . Client

package source

import (
"context"
"errors"
"fmt"
"strings"
"sync"

sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/twmb/franz-go/pkg/kgo"
)

type FranzConsumer struct {
client *kgo.Client
client Client
acker *batchAcker

iter *kgo.FetchesRecordIter

retryGroupJoinErrors bool
}

// Client is a franz-go kafka client.
type Client interface {
Close()
CommitRecords(ctx context.Context, rs ...*kgo.Record) error
OptValue(opt any) any
PollFetches(ctx context.Context) kgo.Fetches
}

var _ Consumer = (*FranzConsumer)(nil)
Expand All @@ -53,16 +66,24 @@ func NewFranzConsumer(ctx context.Context, cfg Config) (*FranzConsumer, error) {
}

return &FranzConsumer{
client: cl,
acker: newBatchAcker(cl, 1000),
iter: &kgo.FetchesRecordIter{}, // empty iterator is done
client: cl,
acker: newBatchAcker(cl, 1000),
iter: &kgo.FetchesRecordIter{}, // empty iterator is done
retryGroupJoinErrors: cfg.RetryGroupJoinErrors,
}, nil
}

func (c *FranzConsumer) Consume(ctx context.Context) (*Record, error) {
for c.iter.Done() {
fetches := c.client.PollFetches(ctx)
if err := fetches.Err(); err != nil {
var errGroupSession *kgo.ErrGroupSession
if c.retryGroupJoinErrors &&
(errors.As(err, &errGroupSession) || strings.Contains(err.Error(), "unable to join group session")) {
sdk.Logger(ctx).Warn().Err(err).Msgf("group session error, retrying")
return nil, sdk.ErrBackoffRetry
}

return nil, err
}
if fetches.Empty() {
Expand Down Expand Up @@ -92,7 +113,7 @@ func (c *FranzConsumer) Close(ctx context.Context) error {

// batchAcker commits acks in batches.
type batchAcker struct {
client *kgo.Client
client Client

batchSize int
curBatchIndex int
Expand All @@ -101,7 +122,7 @@ type batchAcker struct {
m sync.Mutex
}

func newBatchAcker(client *kgo.Client, batchSize int) *batchAcker {
func newBatchAcker(client Client, batchSize int) *batchAcker {
return &batchAcker{
client: client,
batchSize: batchSize,
Expand Down
6 changes: 3 additions & 3 deletions source/franz_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestFranzConsumer_Consume_FromBeginning(t *testing.T) {
is := is.New(t)
ctx := context.Background()

cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false))
cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false, false))
cfg.ReadFromBeginning = true

records := test.GenerateFranzRecords(1, 6)
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestFranzConsumer_Consume_LastOffset(t *testing.T) {
is := is.New(t)
ctx := context.Background()

cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false))
cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, false, false))
cfg.ReadFromBeginning = false

records := test.GenerateFranzRecords(1, 6)
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestFranzConsumer_Consume_MultipleTopics(t *testing.T) {
is := is.New(t)
ctx := context.Background()

cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true))
cfg := test.ParseConfigMap[Config](t, test.SourceConfigMap(t, true, false))
cfg.ReadFromBeginning = true

records := test.GenerateFranzRecords(1, 6)
Expand Down
Loading

0 comments on commit 3eb3b3a

Please sign in to comment.