Skip to content

Commit

Permalink
Fix consumer in STOPPED state false positive
Browse files Browse the repository at this point in the history
Consider the folllowing line of kafka events:

P: Produce
P: Produce
C: Commit offset=1
B: Accept Commit offset=1
C: Commit offset=2
B: Dropped Commit offset=2 (due to mindistance)
...
B: Consumer is STOPPED

In this state, after a while, the consumer is considered stopped,
which is clearly not the case: All messages are consumed successfully.

Burrow marks the consumer for that topic:partition as stopped because
of Rule4.

Burrow, in order to prevent partitions with no new messages to be
considered stopped, checks if the Lag of the last window is
larger/or-equal than the partition's broker offset and if that's the case,
inserts an Artificial Offset with Lag=0 and the current Timestamp. It's
that Timestamp that prevents marking the consumer stoppped under Rule4.

In the above case the artificial offset is not inserted because the
Lag of the last window is less than actual Partition's offset.
That important last commit was dropped due to mindistance!

This is a dirty patch that attempts to deal with that case. In each
ConsumerOffset in the ring we piggyback a MaxOffset value that tracks the
max offset seen during that period, effectively treating ConsumerOffset
as a ConsumerWindow. With that in place, we add an extra step during
Artificial Offset code: If MaxOffset is >= than the partition's broker
Offset we should insert an Artificial Offset.
  • Loading branch information
ctrochalakis committed Jan 27, 2017
1 parent 1e4b0d8 commit ab642be
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
25 changes: 23 additions & 2 deletions offsets_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ func (storage *OffsetStorage) addConsumerOffset(offset *protocol.PartitionOffset

// Prevent new commits that are too fast (less than the min-distance config) if the last offset was not artificial
if (!lastOffset.Artificial) && (timestampDifference >= 0) && (timestampDifference < (storage.app.Config.Lagcheck.MinDistance * 1000)) {
// Store the current offset before dropping the ConsumerOffset.
// This might be the last offset we receive for a while, and it can help us decide if the consumer is STOPPED because
// it has processed all messages.
lastOffset.MaxOffset = offset.Offset
clusterOffsets.consumerLock.Unlock()
log.Debugf("Dropped offset (mindistance): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v tsdiff=%v lag=%v",
offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset,
Expand All @@ -294,13 +298,15 @@ func (storage *OffsetStorage) addConsumerOffset(offset *protocol.PartitionOffset
if consumerPartitionRing.Value == nil {
consumerPartitionRing.Value = &protocol.ConsumerOffset{
Offset: offset.Offset,
MaxOffset: 0,
Timestamp: offset.Timestamp,
Lag: partitionLag,
Artificial: false,
}
} else {
ringval, _ := consumerPartitionRing.Value.(*protocol.ConsumerOffset)
ringval.Offset = offset.Offset
ringval.MaxOffset = 0
ringval.Timestamp = offset.Timestamp
ringval.Lag = partitionLag
ringval.Artificial = false
Expand Down Expand Up @@ -388,13 +394,28 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result
if lastOffset.Offset >= clusterMap.broker[topic][partition].Offset {
ringval, _ := offsetRing.Value.(*protocol.ConsumerOffset)
ringval.Offset = lastOffset.Offset
ringval.MaxOffset = lastOffset.MaxOffset
ringval.Timestamp = time.Now().Unix() * 1000
ringval.Lag = 0
ringval.Artificial = true
partitions[partition] = partitions[partition].Next()

log.Tracef("Artificial offset: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v lag=0",
cluster, topic, partition, group, ringval.Timestamp, lastOffset.Offset)
log.Tracef("Artificial offset: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v max_offset=%v lag=0",
cluster, topic, partition, group, ringval.Timestamp, lastOffset.Offset, lastOffset.MaxOffset)

// Add an artificial offset commit if the consumer has no lag against the current broker offset
// This time we check the maxOffset seen in the last window (piggybacked in the ConsumerOffset).
} else if lastOffset.MaxOffset >= clusterMap.broker[topic][partition].Offset {
ringval, _ := offsetRing.Value.(*protocol.ConsumerOffset)
ringval.Offset = lastOffset.MaxOffset
ringval.MaxOffset = 0
ringval.Timestamp = time.Now().Unix() * 1000
ringval.Lag = 0
ringval.Artificial = true
partitions[partition] = partitions[partition].Next()

log.Tracef("Artificial max offset: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v max_offset=%v lag=0",
cluster, topic, partition, group, ringval.Timestamp, lastOffset.Offset, lastOffset.MaxOffset)
}

// Pull out the offsets once so we can unlock the map
Expand Down
1 change: 1 addition & 0 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type ConsumerOffset struct {
Timestamp int64 `json:"timestamp"`
Lag int64 `json:"lag"`
Artificial bool `json:"-"`
MaxOffset int64 `json:"max_offset"`
}

type StatusConstant int
Expand Down

0 comments on commit ab642be

Please sign in to comment.