diff --git a/consumer/consumer.go b/consumer/consumer.go index e72646f..af788d4 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -24,15 +24,16 @@ type clusterConsumer interface { // Consumer is a Kafka consumer. type Consumer struct { + RetryInterval time.Duration + Metrics MetricsReporter + // If you wish to provide a different value for the Logger, you must do this prior to calling Serve. + Logger *log.Logger + newConsumer func(addrs []string, groupID string, topics []string, config *cluster.Config) (clusterConsumer, error) consumer clusterConsumer config *cluster.Config handlers *handler.Collection wg sync.WaitGroup quit chan struct{} - RetryInterval time.Duration - Metrics MetricsReporter - newConsumer func(addrs []string, groupID string, topics []string, config *cluster.Config) (clusterConsumer, error) - Logger *log.Logger } // Handle registers the handler for the given topic. @@ -139,9 +140,7 @@ func (c *Consumer) handlePartitions(ch <-chan cluster.PartitionConsumer) error { c.handleMessages(pc.Messages(), c.consumer, pc, pc.Topic(), pc.Partition()) }(part) case <-c.quit: - if c.Logger != nil { - c.Logger.Println("partition handler terminating") - } + c.Logger.Println("partition handler terminating") return nil } diff --git a/consumer/consumer_internal_test.go b/consumer/consumer_internal_test.go index 85640d3..63e6c43 100644 --- a/consumer/consumer_internal_test.go +++ b/consumer/consumer_internal_test.go @@ -24,7 +24,7 @@ const ( // TestLogger grabs logs in a buffer so we can later make assertions // about them. type TestLogger struct { - buf *bytes.Buffer + buf bytes.Buffer Logger *log.Logger t *testing.T } @@ -32,10 +32,9 @@ type TestLogger struct { // NewTestLogger constructs a test logger we can make assertions against func NewTestLogger(t *testing.T) *TestLogger { tl := &TestLogger{ - buf: bytes.NewBuffer([]byte{}), t: t, } - tl.Logger = log.New(tl.buf, "[Felice] ", log.LstdFlags) + tl.Logger = log.New(&tl.buf, "[Felice] ", log.LstdFlags) return tl }