diff --git a/batch_consumer.go b/batch_consumer.go index ba2825c..98031ef 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -1,6 +1,7 @@ package kafka import ( + "errors" "time" "github.com/prometheus/client_golang/prometheus" @@ -42,7 +43,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { if cfg.RetryEnabled { c.base.setupCronsumer(cfg, func(message kcronsumer.Message) error { - return c.consumeFn([]*Message{toMessage(message)}) + return c.runKonsumerFn(message) }) } @@ -53,6 +54,16 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { return &c, nil } +func (b *batchConsumer) runKonsumerFn(message kcronsumer.Message) error { + msgList := []*Message{toMessage(message)} + + err := b.consumeFn(msgList) + if msgList[0].ErrDescription != "" { + err = errors.New(msgList[0].ErrDescription) + } + return err +} + func (b *batchConsumer) GetMetricCollectors() []prometheus.Collector { return b.base.GetMetricCollectors() } diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 1b390ec..d36643c 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -404,6 +404,41 @@ func Test_batchConsumer_Resume(t *testing.T) { } } +func Test_batchConsumer_runKonsumerFn(t *testing.T) { + t.Run("Should_Return_Default_Error_When_Error_Description_Does_Not_Exist", func(t *testing.T) { + // Given + expectedError := errors.New("default error") + bc := batchConsumer{consumeFn: func(messages []*Message) error { + return expectedError + }} + + // When + actualError := bc.runKonsumerFn(kcronsumer.Message{}) + + // Then + if actualError.Error() != expectedError.Error() { + t.Fatalf("actual error = %s should be equal to expected error = %s", actualError.Error(), expectedError.Error()) + } + }) + + t.Run("Should_Return_Message_Error_Description_When_Error_Description_Exist", func(t *testing.T) { + // Given + expectedError := errors.New("message error description") + bc := batchConsumer{consumeFn: func(messages []*Message) error { + messages[0].ErrDescription = "message error description" + return errors.New("default error") + }} + + // When + actualError := bc.runKonsumerFn(kcronsumer.Message{}) + + // Then + if actualError.Error() != expectedError.Error() { + t.Fatalf("actual error = %s should be equal to expected error = %s", actualError.Error(), expectedError.Error()) + } + }) +} + func createMessages(partitionStart int, partitionEnd int) []*Message { messages := make([]*Message, 0) for i := partitionStart; i < partitionEnd; i++ { diff --git a/examples/with-kafka-transactional-retry-disabled/main.go b/examples/with-kafka-transactional-retry-disabled/main.go index d3bee3c..09f0429 100644 --- a/examples/with-kafka-transactional-retry-disabled/main.go +++ b/examples/with-kafka-transactional-retry-disabled/main.go @@ -16,9 +16,9 @@ func main() { }) producer.ProduceBatch(context.Background(), []kafka.Message{ - {Value: []byte("message1")}, - {Value: []byte("message2")}, - {Value: []byte("message3")}, + {Key: []byte("key1"), Value: []byte("message1")}, + {Key: []byte("key2"), Value: []byte("message2")}, + {Key: []byte("key3"), Value: []byte("message3")}, }) consumerCfg := &kafka.ConsumerConfig{ @@ -40,7 +40,7 @@ func main() { WorkDuration: 20 * time.Second, MaxRetry: 3, }, - MessageGroupDuration: time.Second, + MessageGroupDuration: 5 * time.Second, } consumer, _ := kafka.NewConsumer(consumerCfg) @@ -54,14 +54,19 @@ func main() { <-c } -// In order to load topic with data, use: -// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/load.txt func batchConsumeFn(messages []*kafka.Message) error { // you can add custom error handling here & flag messages for i := range messages { if i < 2 { messages[i].IsFailed = true - messages[i].ErrDescription = fmt.Sprintf("%d error", i+1) + + var retryCount string + retryCountHeader := messages[i].Header("x-retry-count") + if retryCountHeader != nil { + retryCount = string(retryCountHeader.Value) + } + + messages[i].ErrDescription = fmt.Sprintf("Key = %s error, retry count %s", string(messages[i].Key), retryCount) } }