Skip to content

Commit

Permalink
Improving error logging (#45)
Browse files Browse the repository at this point in the history
* Improving error logging
  • Loading branch information
ErwanVP authored Dec 23, 2024
1 parent 88a3767 commit 5107ae6
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 26 deletions.
11 changes: 9 additions & 2 deletions go-kafka.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package kafka

import (
"io/ioutil"
"io"
"log"
"os"
"time"
Expand All @@ -13,6 +13,13 @@ import (
var Brokers []string

// StdLogger is used to log messages.

// StdLogger is the interface used to log messages.
// Print and println provides this type of log.
// print(ctx, err, "key", "value")
// print(err, "key", "value")
// print(ctx, "key", "value")
// print(ctx, err)
type StdLogger interface {
Print(v ...interface{})
Printf(format string, v ...interface{})
Expand All @@ -22,7 +29,7 @@ type StdLogger interface {
// Logger is the instance of a StdLogger interface.
// By default it is set to discard all log messages via ioutil.Discard,
// but you can set it to redirect wherever you want.
var Logger StdLogger = log.New(ioutil.Discard, "[Go-Kafka] ", log.LstdFlags)
var Logger StdLogger = log.New(io.Discard, "[Go-Kafka] ", log.LstdFlags)

// ErrorLogger is the instance of a StdLogger interface.
// By default it is set to output on stderr all log messages,
Expand Down
58 changes: 38 additions & 20 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Listener interface {
// NewListener creates a new instance of Listener
func NewListener(groupID string, handlers Handlers, options ...ListenerOption) (Listener, error) {
if groupID == "" {
return nil, errors.New("cannot create new listener, groupID cannot be empty")
return nil, errors.New("cannot create new listener, group_id cannot be empty")
}
if len(handlers) == 0 {
return nil, errors.New("cannot create new listener, handlers cannot be empty")
Expand All @@ -94,9 +94,9 @@ func NewListener(groupID string, handlers Handlers, options ...ListenerOption) (
}

go func() {
err := <-consumerGroup.Errors()
if err != nil {
ErrorLogger.Println("sarama error: %s", err.Error())
errConsumer := <-consumerGroup.Errors()
if errConsumer != nil {
ErrorLogger.Println(err, "error", "sarama error")
}
}()

Expand Down Expand Up @@ -133,10 +133,10 @@ func (l *listener) GroupID() string {
func checkErrorTopicToAvoidInfiniteLoop(handlers Handlers) error {
for topic, handler := range handlers {
if handler.Config.RetryTopic == topic {
return fmt.Errorf("Retry topic cannot be the same as the original topic: %s", topic)
return fmt.Errorf("retry topic cannot be the same as the original topic: %s", topic)
}
if handler.Config.DeadletterTopic == topic {
return fmt.Errorf("Deadletter topic cannot be the same as the original topic: %s", topic)
return fmt.Errorf("deadletter topic cannot be the same as the original topic: %s", topic)
}
}
return nil
Expand Down Expand Up @@ -164,7 +164,7 @@ type ListenerOption func(l *listener)
// Listen process incoming kafka messages with handlers configured by the listener
func (l *listener) Listen(consumerContext context.Context) error {
if l.consumerGroup == nil {
return errors.New("cannot subscribe. ConsumerGroup is nil")
return errors.New("consumerGroup is nil, cannot listen")
}

// When a session is over, make consumer join a new session, as long as the context is not cancelled
Expand All @@ -187,7 +187,7 @@ func (l *listener) Close() {
if l.consumerGroup != nil {
err := l.consumerGroup.Close()
if err != nil {
ErrorLogger.Printf("Error while closing sarama consumerGroup: %s", err.Error())
ErrorLogger.Println(err, "error", "unable to close sarama consumerGroup")
}
}
}
Expand Down Expand Up @@ -261,7 +261,7 @@ func (l *listener) handleErrorMessage(initialError error, handler Handler, msg *
}

// Log
ErrorLogger.Printf("Consume: %+v", initialError)
ErrorLogger.Println(initialError, "error", "unable to process message, we apply retry topic policy")

// Inc dropped messages metrics
if l.instrumenting != nil && l.instrumenting.recordErrorCounter != nil {
Expand All @@ -271,21 +271,25 @@ func (l *listener) handleErrorMessage(initialError error, handler Handler, msg *
if isRetriableError(initialError) {
// First, check if handler's config defines retry topic
if handler.Config.RetryTopic != "" {
Logger.Printf("Sending message to retry topic: %s", handler.Config.RetryTopic)
Logger.Printf("sending message to retry topic: %s", handler.Config.RetryTopic)
err := forwardToTopic(l, msg, handler.Config.RetryTopic)
if err != nil {
ErrorLogger.Printf("Cannot send message to handler's retry topic %s: %+v", handler.Config.RetryTopic, err)
errLog := []interface{}{err, "error", "cannot send message to handler's retry topic", "retry_topic", handler.Config.RetryTopic}
errLog = append(errLog, extractMessageInfoForLog(msg)...)
ErrorLogger.Println(errLog...)
}
return
}

// If not, check if global retry topic pattern is defined
if PushConsumerErrorsToRetryTopic {
topicName := l.deduceTopicNameFromPattern(msg.Topic, RetryTopicPattern)
Logger.Printf("Sending message to retry topic: %s", topicName)
Logger.Printf("sending message to retry topic: %s", topicName)
err := forwardToTopic(l, msg, topicName)
if err != nil {
ErrorLogger.Printf("Cannot send message to handler's retry topic defined with global pattern %s: %+v", topicName, err)
errLog := []interface{}{err, "error", "cannot send message to handler's retry topic defined with global pattern", "topic", topicName}
errLog = append(errLog, extractMessageInfoForLog(msg)...)
ErrorLogger.Println(errLog...)
}
return
}
Expand All @@ -294,21 +298,26 @@ func (l *listener) handleErrorMessage(initialError error, handler Handler, msg *
// If the error is not retriable, or if there is no retry topic defined at all, then try to send to dead letter topic
// First, check if handler's config defines deadletter topic
if handler.Config.DeadletterTopic != "" {
Logger.Printf("Sending message to handler's deadletter topic: %s", handler.Config.DeadletterTopic)
Logger.Printf("sending message to handler's deadletter topic: %s", handler.Config.DeadletterTopic)
err := forwardToTopic(l, msg, handler.Config.DeadletterTopic)
if err != nil {
ErrorLogger.Printf("Cannot send message to handler's deadletter topic %s: %+v", handler.Config.RetryTopic, err)
errLog := []interface{}{err, "error", "cannot send message to handler's deadletter topic", "deadletter_topic", handler.Config.DeadletterTopic}
errLog = append(errLog, extractMessageInfoForLog(msg)...)
ErrorLogger.Println(errLog...)

}
return
}

// If not, check if global deadletter topic pattern is defined
if PushConsumerErrorsToDeadletterTopic {
topicName := l.deduceTopicNameFromPattern(msg.Topic, DeadletterTopicPattern)
Logger.Printf("Sending message to deadletter topic: %s", topicName)
Logger.Printf("sending message to deadletter topic: %s", topicName)
err := forwardToTopic(l, msg, topicName)
if err != nil {
ErrorLogger.Printf("Cannot send message to handler's deadletter topic defined with global pattern %s: %+v", topicName, err)
errorLog := []interface{}{err, "error", "cannot send message to handler's deadletter topic defined with global pattern", "topic", topicName}
errorLog = append(errorLog, extractMessageInfoForLog(msg)...)
ErrorLogger.Println(errorLog...)
}
return
}
Expand All @@ -335,7 +344,7 @@ func isRetriableError(initialError error) bool {
}

func (l *listener) handleOmittedMessage(initialError error, msg *sarama.ConsumerMessage) {
ErrorLogger.Printf("Omitted message: %+v", initialError)
ErrorLogger.Println(initialError, "error", "omitted message")

// Inc dropped messages metrics
if l.instrumenting != nil && l.instrumenting.recordOmittedCounter != nil {
Expand All @@ -347,7 +356,7 @@ func (l *listener) handleOmittedMessage(initialError error, msg *sarama.Consumer
func (l *listener) handleMessageWithRetry(ctx context.Context, handler Handler, msg *sarama.ConsumerMessage, retries int) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Panic happened during handle of message: %v", r)
err = fmt.Errorf("panic happened during handle of message: %v", r)
}
}()

Expand All @@ -362,7 +371,9 @@ func (l *listener) handleMessageWithRetry(ctx context.Context, handler Handler,
if retries != InfiniteRetries {
retries--
} else {
ErrorLogger.Printf("Error for message with infinite retry %+v: ", err)
errLog := []interface{}{ctx, err, "error", "unable to process message we retry indefinitely"}
errLog = append(errLog, extractMessageInfoForLog(msg)...)
ErrorLogger.Println(errLog...)
}
return l.handleMessageWithRetry(ctx, handler, msg, retries)
}
Expand All @@ -381,3 +392,10 @@ func shouldRetry(retries int, err error) bool {

return true
}

func extractMessageInfoForLog(msg *sarama.ConsumerMessage) []interface{} {
if msg == nil {
return []interface{}{"message", "nil"}
}
return []interface{}{"message_topic", msg.Topic, "topic_partition", msg.Partition, "message_offset", msg.Offset, "message_key", string(msg.Key)}
}
8 changes: 4 additions & 4 deletions listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func Test_ConsumeClaim_Message_Error_WithErrorTopic(t *testing.T) {

errorLogged := false
mockLogger := &mocks.StdLogger{}
mockLogger.On("Printf", mock.Anything, mock.Anything).Return().Run(func(mock.Arguments) {
mockLogger.On("Println", mock.Anything, mock.Anything, mock.Anything).Return().Run(func(mock.Arguments) {
errorLogged = true
})
ErrorLogger = mockLogger
Expand Down Expand Up @@ -291,7 +291,7 @@ func Test_ConsumeClaim_Message_Error_WithPanicTopic(t *testing.T) {

errorLogged := false
mockLogger := &mocks.StdLogger{}
mockLogger.On("Printf", mock.Anything, mock.Anything).Return().Run(func(mock.Arguments) {
mockLogger.On("Println", mock.Anything, mock.Anything, mock.Anything).Return().Run(func(mock.Arguments) {
errorLogged = true
})
ErrorLogger = mockLogger
Expand Down Expand Up @@ -350,7 +350,7 @@ func Test_ConsumeClaim_Message_Error_WithHandlerSpecificRetryTopic(t *testing.T)

errorLogged := false
mockLogger := &mocks.StdLogger{}
mockLogger.On("Printf", mock.Anything, mock.Anything).Return().Run(func(mock.Arguments) {
mockLogger.On("Println", mock.Anything, mock.Anything, mock.Anything).Return().Run(func(mock.Arguments) {
errorLogged = true
})
ErrorLogger = mockLogger
Expand Down Expand Up @@ -431,7 +431,7 @@ func Test_handleErrorMessage_OmittedError(t *testing.T) {

errorLogged := false
mockLogger := &mocks.StdLogger{}
mockLogger.On("Printf", "Omitted message: %+v", mock.Anything).Return().Run(func(mock.Arguments) {
mockLogger.On("Println", mock.Anything, "error", "omitted message").Return().Run(func(mock.Arguments) {
errorLogged = true
}).Once()
ErrorLogger = mockLogger
Expand Down

0 comments on commit 5107ae6

Please sign in to comment.