Skip to content

Commit

Permalink
Handle case where kafka returns 0 partitions (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach authored and mantzas committed Feb 27, 2019
1 parent 7c250cc commit 4627b00
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion async/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,12 @@ func (c *consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan er
if err != nil {
return nil, nil, errors.Wrap(err, "failed to get partitions")
}
log.Infof("consuming messages for topic '%s'", c.topic)
// When kafka cluster is not fully initialized, we may get 0 partions.
if len(pcs) == 0 {
return nil, nil, errors.New("got 0 partitions")
}

log.Infof("consuming messages for topic '%s' from %d partitions", c.topic, len(pcs))
chMsg := make(chan async.Message, c.buffer)
chErr := make(chan error, c.buffer)

Expand Down

0 comments on commit 4627b00

Please sign in to comment.