diff --git a/common/component/kafka/consumer.go b/common/component/kafka/consumer.go index 0a1c2acfff..7cb923a455 100644 --- a/common/component/kafka/consumer.go +++ b/common/component/kafka/consumer.go @@ -68,6 +68,15 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai } else { for { select { + // Should return when `session.Context()` is done. + // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: + // https://github.com/IBM/sarama/issues/1192 + // Make sure the check for session context done happens before the next message is processed. + // There is a possibility that the pod takes some time to shutdown and in case of a poison pill message, the `retry` would get interrupted (as expected), + // but the next message would be processed as a result, + // therefore dropping the poison pill message regardless of resiliency policy. + case <-session.Context().Done(): + return nil case message, ok := <-claim.Messages(): if !ok { return nil @@ -89,11 +98,6 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai consumer.k.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err) } } - // Should return when `session.Context()` is done. - // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: - // https://github.com/IBM/sarama/issues/1192 - case <-session.Context().Done(): - return nil } } }