diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs index 63c3632c..eed4c793 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs @@ -214,7 +214,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me bool queueLengthIncreasing = IsTrueForLast( metrics, NumberOfSamplesToConsider, - (prev, next) => prev.TotalLag < next.TotalLag) && metrics[0].TotalLag > 0; + (prev, next) => prev.TotalLag < next.TotalLag) && metrics[0].TotalLag > (workerCount * lagThreshold); if (queueLengthIncreasing) { @@ -253,7 +253,27 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me } } } - + + if (workerCount > 1) + { + var proposedWorkerCount = workerCount - 1; + + bool allSamplesBelowThreshold = IsTrueForLast( + metrics, + NumberOfSamplesToConsider, + (prev, next) => next.TotalLag < (lagThreshold * proposedWorkerCount)); + + if (allSamplesBelowThreshold) + { + status.Vote = ScaleVote.ScaleIn; + + if (this.logger.IsEnabled(LogLevel.Information)) + { + this.logger.LogInformation("Total lag length is decreasing for topic {topicName}, for consumer group {consumerGroup}.", this.topicName, this.consumerGroup); + } + } + } + return status; } diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs index f1d8ab87..c74ecc8d 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs @@ -202,6 +202,27 @@ public void When_LagIncreasing_Should_Vote_Scale_Up() Assert.Equal(ScaleVote.ScaleOut, result.Vote); } + [Fact] + public void When_LagIncreasing_But_Under_Threshold_Should_Vote_None() + { + var context = new ScaleStatusContext() + { + Metrics = new KafkaTriggerMetrics[] + { + new KafkaTriggerMetrics(10, partitions.Count), + new KafkaTriggerMetrics(100, partitions.Count), + new KafkaTriggerMetrics(200, partitions.Count), + new KafkaTriggerMetrics(300, partitions.Count), + new KafkaTriggerMetrics(400, partitions.Count), + }, + WorkerCount = 1, + }; + + var result = topicScaler.GetScaleStatus(context); + Assert.NotNull(result); + Assert.Equal(ScaleVote.None, result.Vote); + } + [Fact] public void When_LagDecreasing_Slowly_Should_Vote_None() { @@ -327,5 +348,26 @@ public void When_LagDecreasing_Should_Vote_Scale_In() Assert.NotNull(result); Assert.Equal(ScaleVote.ScaleIn, result.Vote); } + + [Fact] + public void When_Lag_Consistently_Below_Threshold_Should_Vote_Scale_In() + { + var context = new ScaleStatusContext() + { + Metrics = new KafkaTriggerMetrics[] + { + new KafkaTriggerMetrics(2, partitions.Count), + new KafkaTriggerMetrics(2, partitions.Count), + new KafkaTriggerMetrics(1, partitions.Count), + new KafkaTriggerMetrics(2, partitions.Count), + new KafkaTriggerMetrics(1, partitions.Count), + }, + WorkerCount = 2, + }; + + var result = topicScaler.GetScaleStatus(context); + Assert.NotNull(result); + Assert.Equal(ScaleVote.ScaleIn, result.Vote); + } } }