diff --git a/batch_consumer.go b/batch_consumer.go index bffcb66..c338b55 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -2,6 +2,7 @@ package kafka import ( "errors" + "fmt" "time" "github.com/prometheus/client_golang/prometheus" @@ -239,8 +240,11 @@ func (b *batchConsumer) process(chunkMessages []*Message) { } } - if produceErr := b.base.cronsumer.ProduceBatch(cronsumerMessages); produceErr != nil { - b.logger.Errorf("Error producing messages to exception/retry topic %s", produceErr.Error()) + if err := b.retryBatchWithBackoff(cronsumerMessages); err != nil { + errorMsg := fmt.Sprintf( + "Error producing messages to exception/retry topic: %s. Error: %s", b.retryTopic, err.Error()) + b.logger.Error(errorMsg) + panic(errorMsg) } } } @@ -249,3 +253,18 @@ func (b *batchConsumer) process(chunkMessages []*Message) { b.metric.TotalProcessedMessagesCounter += int64(len(chunkMessages)) } } + +func (b *batchConsumer) retryBatchWithBackoff(retryableMessages []kcronsumer.Message) error { + var produceErr error + + for attempt := 1; attempt <= 5; attempt++ { + produceErr = b.base.cronsumer.ProduceBatch(retryableMessages) + if produceErr == nil { + return nil + } + b.logger.Warnf("Error producing message (attempt %d/%d): %v", attempt, 5, produceErr) + time.Sleep((50 * time.Millisecond) * time.Duration(1<