Skip to content
This repository has been archived by the owner on Jun 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #20 from heetch/support-logging
Browse files Browse the repository at this point in the history
Support logging
  • Loading branch information
tealeg authored Nov 7, 2018
2 parents 74374bf + 6eabeb2 commit 4c461b5
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 80 deletions.
46 changes: 37 additions & 9 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package consumer

import (
"fmt"
"io/ioutil"
"log"
"strconv"
"sync"
"time"
Expand All @@ -13,15 +15,25 @@ import (
"github.com/pkg/errors"
)

// clusterConsumer is an interface that makes it possible to fake the cluster.Consumer for testing purposes.
type clusterConsumer interface {
MarkOffset(*sarama.ConsumerMessage, string)
Partitions() <-chan cluster.PartitionConsumer
Close() error
}

// Consumer is a Kafka consumer.
type Consumer struct {
consumer *cluster.Consumer
RetryInterval time.Duration
Metrics MetricsReporter
// If you wish to provide a different value for the Logger, you must do this prior to calling Serve.
Logger *log.Logger
newConsumer func(addrs []string, groupID string, topics []string, config *cluster.Config) (clusterConsumer, error)
consumer clusterConsumer
config *cluster.Config
handlers *handler.Collection
wg sync.WaitGroup
quit chan struct{}
RetryInterval time.Duration
Metrics MetricsReporter
}

// Handle registers the handler for the given topic.
Expand All @@ -33,8 +45,11 @@ func (c *Consumer) Handle(topic string, h handler.Handler) {
}

func (c *Consumer) setup() {
if c.Logger == nil {
c.Logger = log.New(ioutil.Discard, "[Felice] ", log.LstdFlags)
}
if c.handlers == nil {
c.handlers = &handler.Collection{}
c.handlers = &handler.Collection{Logger: c.Logger}
}

if c.quit == nil {
Expand All @@ -44,6 +59,12 @@ func (c *Consumer) setup() {
if c.RetryInterval == 0 {
c.RetryInterval = time.Second
}
if c.newConsumer == nil {
c.newConsumer = func(addrs []string, groupID string, topics []string, config *cluster.Config) (clusterConsumer, error) {
cons, err := cluster.NewConsumer(addrs, groupID, topics, config)
return clusterConsumer(cons), err
}
}
}

func newClusterConfig(clientID string) *cluster.Config {
Expand All @@ -69,7 +90,7 @@ func (c *Consumer) Serve(clientID string, addrs ...string) error {

consumerGroup := fmt.Sprintf("%s-consumer-group", clientID)
var err error
c.consumer, err = cluster.NewConsumer(
c.consumer, err = c.newConsumer(
addrs,
consumerGroup,
topics,
Expand All @@ -88,7 +109,8 @@ func (c *Consumer) Serve(clientID string, addrs ...string) error {
// annoying little issue.
err = errors.Wrap(err, "__consumer_offsets topic doesn't yet exist, either because no client has yet requested an offset, or because this consumer group is not yet functioning at startup or after rebalancing.")
}
return errors.Wrap(err, fmt.Sprintf("failed to create a consumer for topics %+v in consumer group %q", topics, consumerGroup))
err = errors.Wrap(err, fmt.Sprintf("failed to create a consumer for topics %+v in consumer group %q", topics, consumerGroup))
return err
}

err = c.handlePartitions(c.consumer.Partitions())
Expand All @@ -115,16 +137,20 @@ func (c *Consumer) handlePartitions(ch <-chan cluster.PartitionConsumer) error {
c.wg.Add(1)
go func(pc cluster.PartitionConsumer) {
defer c.wg.Done()

c.handleMessages(pc.Messages(), c.consumer, pc)
c.handleMessages(pc.Messages(), c.consumer, pc, pc.Topic(), pc.Partition())
}(part)
case <-c.quit:
c.Logger.Println("partition handler terminating")
return nil

}
}
}

func (c *Consumer) handleMessages(ch <-chan *sarama.ConsumerMessage, offset offsetStash, max highWaterMarker) {
func (c *Consumer) handleMessages(ch <-chan *sarama.ConsumerMessage, offset offsetStash, max highWaterMarker, topic string, partition int32) {
logSuffix := fmt.Sprintf(", topic=%q, partition=%d\n", topic, partition)
c.Logger.Println("partition messages - reading" + logSuffix)

for msg := range ch {
var attempts int

Expand All @@ -151,6 +177,7 @@ func (c *Consumer) handleMessages(ch <-chan *sarama.ConsumerMessage, offset offs
select {
case <-time.After(c.RetryInterval):
case <-c.quit:
c.Logger.Println("partition messages - closing" + logSuffix)
return
}
}
Expand Down Expand Up @@ -185,3 +212,4 @@ type offsetStash interface {
type highWaterMarker interface {
HighWaterMarkOffset() int64
}

Loading

0 comments on commit 4c461b5

Please sign in to comment.