-
Notifications
You must be signed in to change notification settings - Fork 119
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: creates the ConsumerLagWorkerBalancer
- Loading branch information
1 parent
413e24a
commit ae4fcb9
Showing
4 changed files
with
226 additions
and
42 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
158 changes: 158 additions & 0 deletions
158
src/KafkaFlow/Consumers/WorkersBalancers/ConsumerLagWorkerBalancer.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
namespace KafkaFlow.Consumers.WorkersBalancers | ||
{ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Threading.Tasks; | ||
using KafkaFlow.Clusters; | ||
using KafkaFlow.Configuration; | ||
|
||
internal class ConsumerLagWorkerBalancer | ||
{ | ||
private readonly IClusterManager clusterManager; | ||
private readonly IConsumerAccessor consumerAccessor; | ||
private readonly ILogHandler logHandler; | ||
private readonly int totalConsumerWorkers; | ||
private readonly int minInstanceWorkers; | ||
private readonly int maxInstanceWorkers; | ||
|
||
public ConsumerLagWorkerBalancer( | ||
IClusterManager clusterManager, | ||
IConsumerAccessor consumerAccessor, | ||
ILogHandler logHandler, | ||
int totalConsumerWorkers, | ||
int minInstanceWorkers, | ||
int maxInstanceWorkers) | ||
{ | ||
this.clusterManager = clusterManager; | ||
this.consumerAccessor = consumerAccessor; | ||
this.logHandler = logHandler; | ||
this.totalConsumerWorkers = totalConsumerWorkers; | ||
this.minInstanceWorkers = minInstanceWorkers; | ||
this.maxInstanceWorkers = maxInstanceWorkers; | ||
} | ||
|
||
public async Task<int> GetWorkersCountAsync(WorkersCountContext context) | ||
{ | ||
var workers = await this.CalculateAsync(context); | ||
|
||
this.logHandler.Info( | ||
"New workers count calculated", | ||
new | ||
{ | ||
Workers = workers, | ||
Consumer = context.ConsumerName, | ||
}); | ||
|
||
return workers; | ||
} | ||
|
||
private static long CalculateMyPartitionsLag( | ||
WorkersCountContext context, | ||
IEnumerable<(string Topic, int Partition, long Lag)> partitionsLag) | ||
{ | ||
return partitionsLag | ||
.Where( | ||
partitionLag => context.AssignedTopicsPartitions | ||
.Any( | ||
topic => topic.Name == partitionLag.Topic && | ||
topic.Partitions.Any(p => p == partitionLag.Partition))) | ||
.Sum(partitionLag => partitionLag.Lag); | ||
} | ||
|
||
private static IReadOnlyList<(string Topic, int Partition, long Lag)> CalculatePartitionsLag( | ||
IEnumerable<(string Topic, int Partition, long Offset)> lastOffsets, | ||
IEnumerable<TopicPartitionOffset> currentPartitionsOffset) | ||
{ | ||
return lastOffsets | ||
.Select( | ||
last => | ||
{ | ||
var currentOffset = currentPartitionsOffset | ||
.Where(current => current.Topic == last.Topic && current.Partition == last.Partition) | ||
.Select(current => current.Offset) | ||
.FirstOrDefault(); | ||
var lastOffset = Math.Max(0, last.Offset); | ||
currentOffset = Math.Max(0, currentOffset); | ||
return (last.Topic, last.Partition, lastOffset - currentOffset); | ||
}) | ||
.ToList(); | ||
} | ||
|
||
private async Task<int> CalculateAsync(WorkersCountContext context) | ||
{ | ||
try | ||
{ | ||
if (!context.AssignedTopicsPartitions.Any()) | ||
{ | ||
return 1; | ||
} | ||
|
||
var topicsMetadata = await this.GetTopicsMetadataAsync(context); | ||
|
||
var lastOffsets = this.GetPartitionsLastOffset(context.ConsumerName, topicsMetadata); | ||
|
||
var partitionsOffset = await this.clusterManager.GetConsumerGroupOffsetsAsync( | ||
context.ConsumerGroupId, | ||
context.AssignedTopicsPartitions.Select(t => t.Name)); | ||
|
||
var partitionsLag = CalculatePartitionsLag(lastOffsets, partitionsOffset); | ||
var instanceLag = CalculateMyPartitionsLag(context, partitionsLag); | ||
|
||
decimal totalConsumerLag = partitionsLag.Sum(p => p.Lag); | ||
|
||
var ratio = instanceLag / Math.Max(1, totalConsumerLag); | ||
|
||
var workers = (int)Math.Round(this.totalConsumerWorkers * ratio); | ||
|
||
workers = Math.Min(workers, this.maxInstanceWorkers); | ||
workers = Math.Max(workers, this.minInstanceWorkers); | ||
|
||
return workers; | ||
} | ||
catch (Exception e) | ||
{ | ||
this.logHandler.Error( | ||
"Error calculating new workers count, using 1 as fallback", | ||
e, | ||
new | ||
{ | ||
context.ConsumerName, | ||
}); | ||
|
||
return 1; | ||
} | ||
} | ||
|
||
private IEnumerable<(string TopicName, int Partition, long Offset)> GetPartitionsLastOffset( | ||
string consumerName, | ||
IEnumerable<(string Name, TopicMetadata Metadata)> topicsMetadata) | ||
{ | ||
var consumer = this.consumerAccessor[consumerName]; | ||
|
||
return topicsMetadata.SelectMany( | ||
topic => topic.Metadata.Partitions.Select( | ||
partition => ( | ||
topic.Name, | ||
partition.Id, | ||
consumer | ||
.QueryWatermarkOffsets(new(topic.Name, new(partition.Id)), TimeSpan.FromSeconds(30)) | ||
.High | ||
.Value))); | ||
} | ||
|
||
private async Task<IReadOnlyList<(string Name, TopicMetadata Metadata)>> GetTopicsMetadataAsync(WorkersCountContext context) | ||
{ | ||
var topicsMetadata = new List<(string Name, TopicMetadata Metadata)>(context.AssignedTopicsPartitions.Count); | ||
|
||
foreach (var topic in context.AssignedTopicsPartitions) | ||
{ | ||
topicsMetadata.Add((topic.Name, await this.clusterManager.GetTopicMetadataAsync(topic.Name))); | ||
} | ||
|
||
return topicsMetadata; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters