Skip to content

Commit

Permalink
Make PartitionConsumer.Close idempotent
Browse files Browse the repository at this point in the history
  • Loading branch information
dim committed Apr 29, 2018
1 parent e8552c0 commit a1ad496
Showing 1 changed file with 4 additions and 1 deletion.
5 changes: 4 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ type partitionConsumer struct {

trigger, dying chan none
responseResult error
closeOnce sync.Once

fetchSize int32
offset int64
Expand Down Expand Up @@ -412,7 +413,9 @@ func (child *partitionConsumer) AsyncClose() {
// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
// also just close itself)
close(child.dying)
child.closeOnce.Do(func() {
close(child.dying)
})
}

func (child *partitionConsumer) Close() error {
Expand Down

0 comments on commit a1ad496

Please sign in to comment.