Skip to content

Commit

Permalink
Merge pull request #21 from meitu/Fix/sarama-nil-message
Browse files Browse the repository at this point in the history
Fix/sarama nil message
  • Loading branch information
Huang-lin authored Mar 13, 2018
2 parents d2d4c63 + 473cedc commit 6b39dd8
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 4 deletions.
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func NewConfig() *Config {
config.OffsetAutoReset = sarama.OffsetNewest
config.ClaimPartitionRetryTimes = 10
config.ClaimPartitionRetryInterval = 3 * time.Second
config.SaramaConfig.Consumer.Return.Errors = true
return config
}

Expand Down
9 changes: 5 additions & 4 deletions example/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func main() {
conf := consumergroup.NewConfig()
conf.ZkList = []string{"127.0.0.1:2181"}
conf.ZkSessionTimeout = 6 * time.Second
conf.TopicList = []string{"test"}
topic := "test"
conf.TopicList = []string{topic}
conf.GroupID = "go-test-group-id"

cg, err := consumergroup.NewConsumerGroup(conf)
Expand All @@ -56,16 +57,16 @@ func main() {

// Retrieve the error and log
go func() {
if topicErrChan, ok := cg.GetErrors("test"); ok {
if topicErrChan, ok := cg.GetErrors(topic); ok {
for err := range topicErrChan {
if err != nil {
fmt.Println("toipic %s got err, %s", err)
fmt.Printf("Toipic %s got err, %s\n", topic, err)
}
}
}
}()

if msgChan, ok := cg.GetMessages("test"); ok {
if msgChan, ok := cg.GetMessages(topic); ok {
for message := range msgChan {
fmt.Println(string(message.Value), message.Offset)
time.Sleep(500 * time.Millisecond)
Expand Down
5 changes: 5 additions & 0 deletions partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ PARTITION_CONSUMER_LOOP:
case err := <-pc.consumer.Errors():
errorChan <- err
case message := <-pc.consumer.Messages():
if message == nil {
cg.logger.Errorf("Sarama partition consumer encounter error, the consumer would be exited")
close(cg.stopper)
break PARTITION_CONSUMER_LOOP
}
select {
case messageChan <- message:
pc.offset = message.Offset + 1
Expand Down

0 comments on commit 6b39dd8

Please sign in to comment.