Skip to content

Commit

Permalink
Optimization to linkedin#171
Browse files Browse the repository at this point in the history
By always setting MaxOffset to the value of Offset when the ring entry is created, we assure that MaxOffset is always greater than or equal to Offset. This
means that when checking for whether or not we should insert an artificial commit, we only need one check - if MaxOffset is greater than or equal to the broker
offset.
  • Loading branch information
Todd Palino committed Mar 2, 2017
1 parent ab642be commit 0a13c77
Showing 1 changed file with 3 additions and 17 deletions.
20 changes: 3 additions & 17 deletions offsets_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,15 +298,15 @@ func (storage *OffsetStorage) addConsumerOffset(offset *protocol.PartitionOffset
if consumerPartitionRing.Value == nil {
consumerPartitionRing.Value = &protocol.ConsumerOffset{
Offset: offset.Offset,
MaxOffset: 0,
MaxOffset: offset.Offset,
Timestamp: offset.Timestamp,
Lag: partitionLag,
Artificial: false,
}
} else {
ringval, _ := consumerPartitionRing.Value.(*protocol.ConsumerOffset)
ringval.Offset = offset.Offset
ringval.MaxOffset = 0
ringval.MaxOffset = offset.Offset
ringval.Timestamp = offset.Timestamp
ringval.Lag = partitionLag
ringval.Artificial = false
Expand Down Expand Up @@ -391,7 +391,7 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result

// Add an artificial offset commit if the consumer has no lag against the current broker offset
lastOffset := offsetRing.Prev().Value.(*protocol.ConsumerOffset)
if lastOffset.Offset >= clusterMap.broker[topic][partition].Offset {
if lastOffset.MaxOffset >= clusterMap.broker[topic][partition].Offset {
ringval, _ := offsetRing.Value.(*protocol.ConsumerOffset)
ringval.Offset = lastOffset.Offset
ringval.MaxOffset = lastOffset.MaxOffset
Expand All @@ -402,20 +402,6 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result

log.Tracef("Artificial offset: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v max_offset=%v lag=0",
cluster, topic, partition, group, ringval.Timestamp, lastOffset.Offset, lastOffset.MaxOffset)

// Add an artificial offset commit if the consumer has no lag against the current broker offset
// This time we check the maxOffset seen in the last window (piggybacked in the ConsumerOffset).
} else if lastOffset.MaxOffset >= clusterMap.broker[topic][partition].Offset {
ringval, _ := offsetRing.Value.(*protocol.ConsumerOffset)
ringval.Offset = lastOffset.MaxOffset
ringval.MaxOffset = 0
ringval.Timestamp = time.Now().Unix() * 1000
ringval.Lag = 0
ringval.Artificial = true
partitions[partition] = partitions[partition].Next()

log.Tracef("Artificial max offset: cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v max_offset=%v lag=0",
cluster, topic, partition, group, ringval.Timestamp, lastOffset.Offset, lastOffset.MaxOffset)
}

// Pull out the offsets once so we can unlock the map
Expand Down

0 comments on commit 0a13c77

Please sign in to comment.