From 7c0b8b128c58a1abe19112445e26443e776b5f8e Mon Sep 17 00:00:00 2001 From: Todd Palino Date: Wed, 2 May 2018 17:23:47 -0400 Subject: [PATCH] Add minimum-complete config for the evaluator (#388) --- core/internal/evaluator/caching.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/internal/evaluator/caching.go b/core/internal/evaluator/caching.go index 2b7dd1c8..f7b1edf0 100644 --- a/core/internal/evaluator/caching.go +++ b/core/internal/evaluator/caching.go @@ -33,8 +33,9 @@ type CachingEvaluator struct { // fields that are appropriate to identify this coordinator Log *zap.Logger - name string - expireCache int + name string + expireCache int + minimumComplete float32 RequestChannel chan *protocol.EvaluatorRequest running sync.WaitGroup @@ -63,6 +64,7 @@ func (module *CachingEvaluator) Configure(name string, configRoot string) { // Set defaults for configs if needed viper.SetDefault(configRoot+".expire-cache", 10) module.expireCache = viper.GetInt(configRoot + ".expire-cache") + module.minimumComplete = float32(viper.GetFloat64(configRoot + ".minimum-complete")) cacheExpire := time.Duration(module.expireCache) * time.Second newCache, err := goswarm.NewSimple(&goswarm.Config{ @@ -221,7 +223,7 @@ func (module *CachingEvaluator) evaluateConsumerStatus(clusterAndConsumer string completePartitions := 0 for topic, partitions := range topics { for partitionID, partition := range partitions { - partitionStatus := evaluatePartitionStatus(partition) + partitionStatus := evaluatePartitionStatus(partition, module.minimumComplete) partitionStatus.Topic = topic partitionStatus.Partition = int32(partitionID) partitionStatus.Owner = partition.Owner @@ -260,7 +262,7 @@ func (module *CachingEvaluator) evaluateConsumerStatus(clusterAndConsumer string return status, nil } -func evaluatePartitionStatus(partition *protocol.ConsumerPartition) *protocol.PartitionStatus { +func evaluatePartitionStatus(partition *protocol.ConsumerPartition, minimumComplete float32) *protocol.PartitionStatus { status := &protocol.PartitionStatus{ Status: protocol.StatusOK, CurrentLag: partition.CurrentLag, @@ -295,7 +297,11 @@ func evaluatePartitionStatus(partition *protocol.ConsumerPartition) *protocol.Pa status.Start = offsets[0] status.End = offsets[len(offsets)-1] - status.Status = calculatePartitionStatus(offsets, partition.BrokerOffsets, partition.CurrentLag, time.Now().Unix()) + // If the partition does not meet the completeness threshold, just return it as OK + if status.Complete >= minimumComplete { + status.Status = calculatePartitionStatus(offsets, partition.BrokerOffsets, partition.CurrentLag, time.Now().Unix()) + } + return status }