Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: throw panic when produce message to retry topic is unsuccessful #150

Merged
merged 7 commits into from
Dec 29, 2024
5 changes: 4 additions & 1 deletion batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"errors"
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -240,7 +241,9 @@ func (b *batchConsumer) process(chunkMessages []*Message) {
}

if produceErr := b.base.cronsumer.ProduceBatch(cronsumerMessages); produceErr != nil {
b.logger.Errorf("Error producing messages to exception/retry topic %s", produceErr.Error())
errorMsg := fmt.Sprintf("Error producing messages to exception/retry topic %s", produceErr.Error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add log message partition, offset and topic also

b.logger.Error(errorMsg)
panic(errorMsg)
}
}
}
Expand Down
33 changes: 15 additions & 18 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,16 @@ func Test_batchConsumer_process(t *testing.T) {
},
}

// When
bc.process([]*Message{{}, {}, {}})
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if bc.metric.TotalUnprocessedMessagesCounter != 3 {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
}
// When && Then
bc.process([]*Message{{}, {}, {}})
})

t.Run("When_Transactional_Retry_Disabled", func(t *testing.T) {
// Given
mc := &mockCronsumer{wantErr: true}
Expand All @@ -286,16 +285,14 @@ func Test_batchConsumer_process(t *testing.T) {
},
}

// When
bc.process([]*Message{{}, {}, {}})
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if bc.metric.TotalUnprocessedMessagesCounter != 3 {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
}
// When && Then
bc.process([]*Message{{}, {}, {}})
})
}

Expand Down
5 changes: 4 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -152,8 +153,10 @@ func (c *consumer) process(message *Message) {
if consumeErr != nil && c.retryEnabled {
retryableMsg := message.toRetryableMessage(c.retryTopic, consumeErr.Error())
if produceErr := c.cronsumer.Produce(retryableMsg); produceErr != nil {
c.logger.Errorf("Error producing message %s to exception/retry topic %s",
errorMessage := fmt.Sprintf("Error producing message %s to exception/retry topic %s",
string(retryableMsg.Value), produceErr.Error())
c.logger.Error(errorMessage)
panic(errorMessage)
}
}

Expand Down
16 changes: 7 additions & 9 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,14 @@ func Test_consumer_process(t *testing.T) {
},
}

// When
c.process(&Message{})
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()

// Then
if c.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if c.metric.TotalUnprocessedMessagesCounter != 1 {
t.Fatalf("Total Unprocessed Message Counter must equal to 1")
}
// When && Then
c.process(&Message{})
})
}

Expand Down
Loading