diff --git a/offsets_store.go b/offsets_store.go index ced87766..9f7cb5b7 100644 --- a/offsets_store.go +++ b/offsets_store.go @@ -316,6 +316,7 @@ func (storage *OffsetStorage) addConsumerOffset(offset *protocol.PartitionOffset offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset, partitionLag) + // Advance the ring pointer consumerTopicMap[offset.Partition] = consumerTopicMap[offset.Partition].Next() clusterOffsets.consumerLock.Unlock() @@ -389,9 +390,11 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result continue } - // Add an artificial offset commit if the consumer has no lag against the current broker offset + // Add an artificial offset commit if the consumer has no lag against the current broker offset AND if the distance away from the + // last offset timestamp is outside the MinDistance threshold lastOffset := offsetRing.Prev().Value.(*protocol.ConsumerOffset) - if lastOffset.MaxOffset >= clusterMap.broker[topic][partition].Offset { + timestampDifference := (time.Now().Unix() * 1000) - lastOffset.Timestamp + if lastOffset.MaxOffset >= clusterMap.broker[topic][partition].Offset && (timestampDifference >= (storage.app.Config.Lagcheck.MinDistance * 1000)) { ringval, _ := offsetRing.Value.(*protocol.ConsumerOffset) ringval.Offset = lastOffset.Offset ringval.MaxOffset = lastOffset.MaxOffset