Skip to content

Commit

Permalink
feat: throw panic when produce message to retry topic is unsuccessful…
Browse files Browse the repository at this point in the history
… for dns related reasons (#150)
  • Loading branch information
pinarrkok authored Dec 29, 2024
1 parent d234e31 commit 6bda451
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 29 deletions.
23 changes: 21 additions & 2 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"errors"
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -239,8 +240,11 @@ func (b *batchConsumer) process(chunkMessages []*Message) {
}
}

if produceErr := b.base.cronsumer.ProduceBatch(cronsumerMessages); produceErr != nil {
b.logger.Errorf("Error producing messages to exception/retry topic %s", produceErr.Error())
if err := b.retryBatchWithBackoff(cronsumerMessages); err != nil {
errorMsg := fmt.Sprintf(
"Error producing messages to exception/retry topic: %s. Error: %s", b.retryTopic, err.Error())
b.logger.Error(errorMsg)
panic(errorMsg)
}
}
}
Expand All @@ -249,3 +253,18 @@ func (b *batchConsumer) process(chunkMessages []*Message) {
b.metric.TotalProcessedMessagesCounter += int64(len(chunkMessages))
}
}

func (b *batchConsumer) retryBatchWithBackoff(retryableMessages []kcronsumer.Message) error {
var produceErr error

for attempt := 1; attempt <= 5; attempt++ {
produceErr = b.base.cronsumer.ProduceBatch(retryableMessages)
if produceErr == nil {
return nil
}
b.logger.Warnf("Error producing message (attempt %d/%d): %v", attempt, 5, produceErr)
time.Sleep((50 * time.Millisecond) * time.Duration(1<<attempt))
}

return produceErr
}
74 changes: 57 additions & 17 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,42 @@ func Test_batchConsumer_process(t *testing.T) {
},
}

// When
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()

// When && Then
bc.process([]*Message{{}, {}, {}})
})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if bc.metric.TotalUnprocessedMessagesCounter != 3 {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
t.Run("When_Re-processing_Is_Failed_And_Retry_Failed_5_times", func(t *testing.T) {
// Given
mc := mockCronsumer{wantErr: true, retryBehaviorOpen: true, maxRetry: 5}
bc := batchConsumer{
base: &base{
metric: &ConsumerMetric{}, transactionalRetry: true,
logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc,
},
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
}

defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
if mc.times != mc.maxRetry {
t.Errorf("Expected produce to be called %d times, but got %d", mc.maxRetry, mc.times)
}
}()

// When && Then
bc.process([]*Message{{}, {}, {}})
})

t.Run("When_Transactional_Retry_Disabled", func(t *testing.T) {
// Given
mc := &mockCronsumer{wantErr: true}
Expand All @@ -286,16 +311,14 @@ func Test_batchConsumer_process(t *testing.T) {
},
}

// When
bc.process([]*Message{{}, {}, {}})
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if bc.metric.TotalUnprocessedMessagesCounter != 3 {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
}
// When && Then
bc.process([]*Message{{}, {}, {}})
})
}

Expand Down Expand Up @@ -468,7 +491,10 @@ func createMessages(partitionStart int, partitionEnd int) []*Message {
}

type mockCronsumer struct {
wantErr bool
wantErr bool
retryBehaviorOpen bool
times int
maxRetry int
}

func (m *mockCronsumer) Start() {
Expand All @@ -488,6 +514,13 @@ func (m *mockCronsumer) WithLogger(_ lcronsumer.Interface) {
}

func (m *mockCronsumer) Produce(_ kcronsumer.Message) error {
if m.retryBehaviorOpen {
if m.wantErr && m.times <= m.maxRetry {
m.times++
return errors.New("error")
}
return nil
}
if m.wantErr {
return errors.New("error")
}
Expand All @@ -499,6 +532,13 @@ func (m *mockCronsumer) GetMetricCollectors() []prometheus.Collector {
}

func (m *mockCronsumer) ProduceBatch([]kcronsumer.Message) error {
if m.retryBehaviorOpen {
if m.wantErr && m.times <= m.maxRetry {
m.times++
return errors.New("error")
}
return nil
}
if m.wantErr {
return errors.New("error")
}
Expand Down
25 changes: 22 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -151,13 +152,31 @@ func (c *consumer) process(message *Message) {

if consumeErr != nil && c.retryEnabled {
retryableMsg := message.toRetryableMessage(c.retryTopic, consumeErr.Error())
if produceErr := c.cronsumer.Produce(retryableMsg); produceErr != nil {
c.logger.Errorf("Error producing message %s to exception/retry topic %s",
string(retryableMsg.Value), produceErr.Error())
if err := c.retryWithBackoff(retryableMsg); err != nil {
errorMessage := fmt.Sprintf(
"Error producing message %s to exception/retry topic %s. Error: %s",
string(message.Value), c.retryTopic, err.Error())
c.logger.Error(errorMessage)
panic(err.Error())
}
}

if consumeErr == nil {
c.metric.TotalProcessedMessagesCounter++
}
}

func (c *consumer) retryWithBackoff(retryableMsg kcronsumer.Message) error {
var produceErr error

for attempt := 1; attempt <= 5; attempt++ {
produceErr = c.cronsumer.Produce(retryableMsg)
if produceErr == nil {
return nil
}
c.logger.Warnf("Error producing message (attempt %d/%d): %v", attempt, 5, produceErr)
time.Sleep((50 * time.Millisecond) * time.Duration(1<<attempt))
}

return produceErr
}
37 changes: 30 additions & 7 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func Test_consumer_process(t *testing.T) {
t.Fatalf("Total Unprocessed Message Counter must equal to 1")
}
})

t.Run("When_Re-processing_Is_Failed_And_Retry_Failed", func(t *testing.T) {
// Given
mc := mockCronsumer{wantErr: true}
Expand All @@ -104,16 +105,38 @@ func Test_consumer_process(t *testing.T) {
},
}

// When
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()

// When && Then
c.process(&Message{})
})

// Then
if c.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if c.metric.TotalUnprocessedMessagesCounter != 1 {
t.Fatalf("Total Unprocessed Message Counter must equal to 1")
t.Run("When_Re-processing_Is_Failed_And_Retry_Failed_5_times", func(t *testing.T) {
// Given
mc := mockCronsumer{wantErr: true, retryBehaviorOpen: true, maxRetry: 5}

c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
consumeFn: func(*Message) error {
return errors.New("error case")
},
}

defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
if mc.times != mc.maxRetry {
t.Errorf("Expected produce to be called %d times, but got %d", mc.maxRetry, mc.times)
}
}()

// When && Then
c.process(&Message{})
})
}

Expand Down

0 comments on commit 6bda451

Please sign in to comment.