From c2b4255560beda104b0a5d027a861b59ba85423b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Lindstr=C3=B6m?= Date: Tue, 10 Nov 2020 10:52:54 +0100 Subject: [PATCH 1/2] Added so that we do not scaleout if we increasing under the threshold --- .../Listeners/KafkaTopicScaler.cs | 2 +- .../KafkaTopicScalerTest.cs | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs index 84887df3..b58517f3 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs @@ -212,7 +212,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me bool queueLengthIncreasing = IsTrueForLast( metrics, NumberOfSamplesToConsider, - (prev, next) => prev.TotalLag < next.TotalLag) && metrics[0].TotalLag > 0; + (prev, next) => prev.TotalLag < next.TotalLag) && metrics[0].TotalLag > (workerCount * lagThreshold); if (queueLengthIncreasing) { diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs index 04d67dda..cc47e101 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs @@ -201,6 +201,27 @@ public void When_LagIncreasing_Should_Vote_Scale_Up() Assert.Equal(ScaleVote.ScaleOut, result.Vote); } + [Fact] + public void When_LagIncreasing_But_Under_Threshold_Should_Vote_None() + { + var context = new ScaleStatusContext() + { + Metrics = new KafkaTriggerMetrics[] + { + new KafkaTriggerMetrics(10, partitions.Count), + new KafkaTriggerMetrics(100, partitions.Count), + new KafkaTriggerMetrics(200, partitions.Count), + new KafkaTriggerMetrics(300, partitions.Count), + new KafkaTriggerMetrics(400, partitions.Count), + }, + WorkerCount = 1, + }; + + var result = topicScaler.GetScaleStatus(context); + Assert.NotNull(result); + Assert.Equal(ScaleVote.None, result.Vote); + } + [Fact] public void When_LagDecreasing_Slowly_Should_Vote_None() { From 192189869eb1a27290b39c5aaea31b1ef3105f08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Lindstr=C3=B6m?= Date: Tue, 10 Nov 2020 10:54:07 +0100 Subject: [PATCH 2/2] Scale in when lag is consistently below threshold --- .../Listeners/KafkaTopicScaler.cs | 22 ++++++++++++++++++- .../KafkaTopicScalerTest.cs | 21 ++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs index b58517f3..a69e47de 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs @@ -251,7 +251,27 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me } } } - + + if (workerCount > 1) + { + var proposedWorkerCount = workerCount - 1; + + bool allSamplesBelowThreshold = IsTrueForLast( + metrics, + NumberOfSamplesToConsider, + (prev, next) => next.TotalLag < (lagThreshold * proposedWorkerCount)); + + if (allSamplesBelowThreshold) + { + status.Vote = ScaleVote.ScaleIn; + + if (this.logger.IsEnabled(LogLevel.Information)) + { + this.logger.LogInformation("Total lag length is decreasing for topic {topicName}, for consumer group {consumerGroup}.", this.topicName, this.consumerGroup); + } + } + } + return status; } diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs index cc47e101..7580a2d2 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs @@ -347,5 +347,26 @@ public void When_LagDecreasing_Should_Vote_Scale_In() Assert.NotNull(result); Assert.Equal(ScaleVote.ScaleIn, result.Vote); } + + [Fact] + public void When_Lag_Consistently_Below_Threshold_Should_Vote_Scale_In() + { + var context = new ScaleStatusContext() + { + Metrics = new KafkaTriggerMetrics[] + { + new KafkaTriggerMetrics(2, partitions.Count), + new KafkaTriggerMetrics(2, partitions.Count), + new KafkaTriggerMetrics(1, partitions.Count), + new KafkaTriggerMetrics(2, partitions.Count), + new KafkaTriggerMetrics(1, partitions.Count), + }, + WorkerCount = 2, + }; + + var result = topicScaler.GetScaleStatus(context); + Assert.NotNull(result); + Assert.Equal(ScaleVote.ScaleIn, result.Vote); + } } }