Skip to content

Commit

Permalink
Merge pull request linkedin#171 from skroutz/stop-partition-false-pos…
Browse files Browse the repository at this point in the history
…itives

Track last offset received even if it violates mindistance
  • Loading branch information
toddpalino authored Mar 3, 2017
2 parents e897aec + 0a13c77 commit 0877968
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
13 changes: 10 additions & 3 deletions offsets_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ func (storage *OffsetStorage) addConsumerOffset(offset *protocol.PartitionOffset

// Prevent new commits that are too fast (less than the min-distance config) if the last offset was not artificial
if (!lastOffset.Artificial) && (timestampDifference >= 0) && (timestampDifference < (storage.app.Config.Lagcheck.MinDistance * 1000)) {
// Store the current offset before dropping the ConsumerOffset.
// This might be the last offset we receive for a while, and it can help us decide if the consumer is STOPPED because
// it has processed all messages.
lastOffset.MaxOffset = offset.Offset
clusterOffsets.consumerLock.Unlock()
log.Debugf("Dropped offset (mindistance): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v tsdiff=%v lag=%v",
offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset,
Expand All @@ -294,13 +298,15 @@ func (storage *OffsetStorage) addConsumerOffset(offset *protocol.PartitionOffset
if consumerPartitionRing.Value == nil {
consumerPartitionRing.Value = &protocol.ConsumerOffset{
Offset: offset.Offset,
MaxOffset: offset.Offset,
Timestamp: offset.Timestamp,
Lag: partitionLag,
Artificial: false,
}
} else {
ringval, _ := consumerPartitionRing.Value.(*protocol.ConsumerOffset)
ringval.Offset = offset.Offset
ringval.MaxOffset = offset.Offset
ringval.Timestamp = offset.Timestamp
ringval.Lag = partitionLag
ringval.Artificial = false
Expand Down Expand Up @@ -385,16 +391,17 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result

// Add an artificial offset commit if the consumer has no lag against the current broker offset
lastOffset := offsetRing.Prev().Value.(*protocol.ConsumerOffset)
if lastOffset.Offset >= clusterMap.broker[topic][partition].Offset {
if lastOffset.MaxOffset >= clusterMap.broker[topic][partition].Offset {
ringval, _ := offsetRing.Value.(*protocol.ConsumerOffset)
ringval.Offset = lastOffset.Offset
ringval.MaxOffset = lastOffset.MaxOffset
ringval.Timestamp = time.Now().Unix() * 1000
ringval.Lag = 0
ringval.Artificial = true
partitions[partition] = partitions[partition].Next()

log.Tracef("Artificial offset: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v lag=0",
cluster, topic, partition, group, ringval.Timestamp, lastOffset.Offset)
log.Tracef("Artificial offset: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v max_offset=%v lag=0",
cluster, topic, partition, group, ringval.Timestamp, lastOffset.Offset, lastOffset.MaxOffset)
}

// Pull out the offsets once so we can unlock the map
Expand Down
1 change: 1 addition & 0 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type ConsumerOffset struct {
Timestamp int64 `json:"timestamp"`
Lag int64 `json:"lag"`
Artificial bool `json:"-"`
MaxOffset int64 `json:"max_offset"`
}

type StatusConstant int
Expand Down

0 comments on commit 0877968

Please sign in to comment.