Skip to content

Commit

Permalink
feat: cronsumer iteration imp related x-error-message
Browse files Browse the repository at this point in the history
  • Loading branch information
dilaragorum committed Mar 28, 2024
1 parent b86c0af commit 9235292
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 8 deletions.
13 changes: 12 additions & 1 deletion batch_consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"errors"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -42,7 +43,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {

if cfg.RetryEnabled {
c.base.setupCronsumer(cfg, func(message kcronsumer.Message) error {
return c.consumeFn([]*Message{toMessage(message)})
return c.runKonsumerFn(message)
})
}

Expand All @@ -53,6 +54,16 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
return &c, nil
}

func (b *batchConsumer) runKonsumerFn(message kcronsumer.Message) error {
msgList := []*Message{toMessage(message)}

err := b.consumeFn(msgList)
if msgList[0].ErrDescription != "" {
err = errors.New(msgList[0].ErrDescription)
}
return err
}

func (b *batchConsumer) GetMetricCollectors() []prometheus.Collector {
return b.base.GetMetricCollectors()
}
Expand Down
35 changes: 35 additions & 0 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,41 @@ func Test_batchConsumer_Resume(t *testing.T) {
}
}

func Test_batchConsumer_runKonsumerFn(t *testing.T) {
t.Run("Should_Return_Default_Error_When_Error_Description_Does_Not_Exist", func(t *testing.T) {
// Given
expectedError := errors.New("default error")
bc := batchConsumer{consumeFn: func(messages []*Message) error {
return expectedError
}}

// When
actualError := bc.runKonsumerFn(kcronsumer.Message{})

// Then
if actualError.Error() != expectedError.Error() {
t.Fatalf("actual error = %s should be equal to expected error = %s", actualError.Error(), expectedError.Error())
}
})

t.Run("Should_Return_Message_Error_Description_When_Error_Description_Exist", func(t *testing.T) {
// Given
expectedError := errors.New("message error description")
bc := batchConsumer{consumeFn: func(messages []*Message) error {
messages[0].ErrDescription = "message error description"
return errors.New("default error")
}}

// When
actualError := bc.runKonsumerFn(kcronsumer.Message{})

// Then
if actualError.Error() != expectedError.Error() {
t.Fatalf("actual error = %s should be equal to expected error = %s", actualError.Error(), expectedError.Error())
}
})
}

func createMessages(partitionStart int, partitionEnd int) []*Message {
messages := make([]*Message, 0)
for i := partitionStart; i < partitionEnd; i++ {
Expand Down
19 changes: 12 additions & 7 deletions examples/with-kafka-transactional-retry-disabled/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ func main() {
})

producer.ProduceBatch(context.Background(), []kafka.Message{
{Value: []byte("message1")},
{Value: []byte("message2")},
{Value: []byte("message3")},
{Key: []byte("key1"), Value: []byte("message1")},
{Key: []byte("key2"), Value: []byte("message2")},
{Key: []byte("key3"), Value: []byte("message3")},
})

consumerCfg := &kafka.ConsumerConfig{
Expand All @@ -40,7 +40,7 @@ func main() {
WorkDuration: 20 * time.Second,
MaxRetry: 3,
},
MessageGroupDuration: time.Second,
MessageGroupDuration: 5 * time.Second,
}

consumer, _ := kafka.NewConsumer(consumerCfg)
Expand All @@ -54,14 +54,19 @@ func main() {
<-c
}

// In order to load topic with data, use:
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/load.txt
func batchConsumeFn(messages []*kafka.Message) error {
// you can add custom error handling here & flag messages
for i := range messages {
if i < 2 {
messages[i].IsFailed = true
messages[i].ErrDescription = fmt.Sprintf("%d error", i+1)

var retryCount string
retryCountHeader := messages[i].Header("x-retry-count")
if retryCountHeader != nil {
retryCount = string(retryCountHeader.Value)
}

messages[i].ErrDescription = fmt.Sprintf("Key = %s error, retry count %s", string(messages[i].Key), retryCount)
}
}

Expand Down

0 comments on commit 9235292

Please sign in to comment.