Skip to content

Commit

Permalink
feat: throw panic when produce message to retry topic is unsuccessful
Browse files Browse the repository at this point in the history
  • Loading branch information
pinarrkok committed Dec 10, 2024
1 parent 0c34509 commit 83b299f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
5 changes: 4 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -152,8 +153,10 @@ func (c *consumer) process(message *Message) {
if consumeErr != nil && c.retryEnabled {
retryableMsg := message.toRetryableMessage(c.retryTopic, consumeErr.Error())
if produceErr := c.cronsumer.Produce(retryableMsg); produceErr != nil {
c.logger.Errorf("Error producing message %s to exception/retry topic %s",
errorMessage := fmt.Sprintf("Error producing message %s to exception/retry topic %s",
string(retryableMsg.Value), produceErr.Error())
c.logger.Error(errorMessage)
panic(errorMessage)
}
}

Expand Down
16 changes: 7 additions & 9 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,14 @@ func Test_consumer_process(t *testing.T) {
},
}

// When
c.process(&Message{})
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()

// Then
if c.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if c.metric.TotalUnprocessedMessagesCounter != 1 {
t.Fatalf("Total Unprocessed Message Counter must equal to 1")
}
// When && Then
c.process(&Message{})
})
}

Expand Down

0 comments on commit 83b299f

Please sign in to comment.