Skip to content

Commit

Permalink
Add minimum-complete config for the evaluator (#388)
Browse files Browse the repository at this point in the history
  • Loading branch information
toddpalino authored May 2, 2018
1 parent dc4cb84 commit 7c0b8b1
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions core/internal/evaluator/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ type CachingEvaluator struct {
// fields that are appropriate to identify this coordinator
Log *zap.Logger

name string
expireCache int
name string
expireCache int
minimumComplete float32

RequestChannel chan *protocol.EvaluatorRequest
running sync.WaitGroup
Expand Down Expand Up @@ -63,6 +64,7 @@ func (module *CachingEvaluator) Configure(name string, configRoot string) {
// Set defaults for configs if needed
viper.SetDefault(configRoot+".expire-cache", 10)
module.expireCache = viper.GetInt(configRoot + ".expire-cache")
module.minimumComplete = float32(viper.GetFloat64(configRoot + ".minimum-complete"))
cacheExpire := time.Duration(module.expireCache) * time.Second

newCache, err := goswarm.NewSimple(&goswarm.Config{
Expand Down Expand Up @@ -221,7 +223,7 @@ func (module *CachingEvaluator) evaluateConsumerStatus(clusterAndConsumer string
completePartitions := 0
for topic, partitions := range topics {
for partitionID, partition := range partitions {
partitionStatus := evaluatePartitionStatus(partition)
partitionStatus := evaluatePartitionStatus(partition, module.minimumComplete)
partitionStatus.Topic = topic
partitionStatus.Partition = int32(partitionID)
partitionStatus.Owner = partition.Owner
Expand Down Expand Up @@ -260,7 +262,7 @@ func (module *CachingEvaluator) evaluateConsumerStatus(clusterAndConsumer string
return status, nil
}

func evaluatePartitionStatus(partition *protocol.ConsumerPartition) *protocol.PartitionStatus {
func evaluatePartitionStatus(partition *protocol.ConsumerPartition, minimumComplete float32) *protocol.PartitionStatus {
status := &protocol.PartitionStatus{
Status: protocol.StatusOK,
CurrentLag: partition.CurrentLag,
Expand Down Expand Up @@ -295,7 +297,11 @@ func evaluatePartitionStatus(partition *protocol.ConsumerPartition) *protocol.Pa
status.Start = offsets[0]
status.End = offsets[len(offsets)-1]

status.Status = calculatePartitionStatus(offsets, partition.BrokerOffsets, partition.CurrentLag, time.Now().Unix())
// If the partition does not meet the completeness threshold, just return it as OK
if status.Complete >= minimumComplete {
status.Status = calculatePartitionStatus(offsets, partition.BrokerOffsets, partition.CurrentLag, time.Now().Unix())
}

return status
}

Expand Down

0 comments on commit 7c0b8b1

Please sign in to comment.