Skip to content

Commit fe623aa

Browse files
committed
POC - Multitopic listener
1 parent 32b56a7 commit fe623aa

File tree

11 files changed

+185
-96
lines changed

11 files changed

+185
-96
lines changed

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ internal class KafkaListener<TKey, TValue> : IListener, IScaleMonitorProvider
4444
private CancellationTokenSource cancellationTokenSource;
4545
private SemaphoreSlim subscriberFinished;
4646
private readonly string consumerGroup;
47-
private readonly string topicName;
47+
private readonly IList<string> topicList;
4848
private readonly string functionId;
4949
//protected for the unit test
5050
protected Lazy<KafkaTopicScaler<TKey, TValue>> topicScaler;
@@ -74,7 +74,7 @@ public KafkaListener(
7474
this.logger = logger;
7575
this.cancellationTokenSource = new CancellationTokenSource();
7676
this.consumerGroup = string.IsNullOrEmpty(this.listenerConfiguration.ConsumerGroup) ? "$Default" : this.listenerConfiguration.ConsumerGroup;
77-
this.topicName = this.listenerConfiguration.Topic;
77+
this.topicList = this.listenerConfiguration.Topics;
7878
this.functionId = functionId;
7979
this.consumer = new Lazy<IConsumer<TKey, TValue>>(() => CreateConsumer());
8080
this.topicScaler = new Lazy<KafkaTopicScaler<TKey, TValue>>(() => CreateTopicScaler());
@@ -114,7 +114,7 @@ private IConsumer<TKey, TValue> CreateConsumer()
114114

115115
private KafkaTopicScaler<TKey, TValue> CreateTopicScaler()
116116
{
117-
return new KafkaTopicScaler<TKey, TValue>(this.listenerConfiguration.Topic, this.consumerGroup, this.functionId, this.consumer.Value, new AdminClientConfig(GetConsumerConfiguration()), this.logger);
117+
return new KafkaTopicScaler<TKey, TValue>(topicList.ToList(), this.consumerGroup, this.functionId, this.consumer.Value, new AdminClientBuilder(new AdminClientConfig(GetConsumerConfiguration())), this.logger);
118118
}
119119

120120
public void Cancel()
@@ -131,7 +131,8 @@ public Task StartAsync(CancellationToken cancellationToken)
131131
(FunctionExecutorBase<TKey, TValue>)new SingleItemFunctionExecutor<TKey, TValue>(executor, localConsumer, this.options.ExecutorChannelCapacity, this.options.ChannelFullRetryIntervalInMs, commitStrategy, logger) :
132132
new MultipleItemFunctionExecutor<TKey, TValue>(executor, localConsumer, this.options.ExecutorChannelCapacity, this.options.ChannelFullRetryIntervalInMs, commitStrategy, logger);
133133

134-
localConsumer.Subscribe(this.listenerConfiguration.Topic);
134+
localConsumer.Subscribe(this.listenerConfiguration.Topics);
135+
135136
// Using a thread as opposed to a task since this will be long running
136137
var thread = new Thread(ProcessSubscription)
137138
{
@@ -325,7 +326,7 @@ private void ProcessSubscription(object parameter)
325326
}
326327
finally
327328
{
328-
this.logger.LogInformation("Exiting {processName} for {topic}", nameof(ProcessSubscription), this.listenerConfiguration.Topic);
329+
this.logger.LogInformation("Exiting {processName} for {topic}", nameof(ProcessSubscription), this.listenerConfiguration.Topics);
329330
this.subscriberFinished.Release();
330331
}
331332
}
@@ -382,7 +383,7 @@ protected virtual void Dispose(bool disposing)
382383
{
383384
if (!disposed)
384385
{
385-
this.logger.LogInformation("Disposing Kafka Listener for {topic}", this.listenerConfiguration.Topic);
386+
this.logger.LogInformation("Disposing Kafka Listener for {topic}", this.listenerConfiguration.Topics);
386387

387388
if (disposing)
388389
{

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs

Lines changed: 59 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,40 +5,37 @@
55
using Microsoft.Azure.WebJobs.Host.Scale;
66
using Microsoft.Extensions.Logging;
77
using System;
8+
using System.Collections;
89
using System.Collections.Generic;
910
using System.Linq;
11+
using System.Text.RegularExpressions;
1012
using System.Threading.Tasks;
1113

1214
namespace Microsoft.Azure.WebJobs.Extensions.Kafka
1315
{
1416
public class KafkaTopicScaler<TKey, TValue> : IScaleMonitor<KafkaTriggerMetrics>
1517
{
16-
private readonly string topicName;
1718
private readonly string consumerGroup;
1819
private readonly ILogger logger;
19-
private readonly AdminClientConfig adminClientConfig;
20+
private readonly AdminClientBuilder adminClientBuilder;
2021
private readonly IConsumer<TKey, TValue> consumer;
22+
private readonly IReadOnlyList<string> topicNames;
2123
private readonly Lazy<List<TopicPartition>> topicPartitions;
2224

2325
public ScaleMonitorDescriptor Descriptor { get; }
2426

25-
public KafkaTopicScaler(string topic, string consumerGroup, string functionId, IConsumer<TKey, TValue> consumer, AdminClientConfig adminClientConfig, ILogger logger)
27+
public KafkaTopicScaler(IReadOnlyList<string> topics, string consumerGroup, string functionId, IConsumer<TKey, TValue> consumer, AdminClientBuilder adminClientBuilder, ILogger logger)
2628
{
27-
if (string.IsNullOrWhiteSpace(topic))
28-
{
29-
throw new ArgumentException("Invalid topic", nameof(topic));
30-
}
31-
3229
if (string.IsNullOrWhiteSpace(consumerGroup))
3330
{
3431
throw new ArgumentException("Invalid consumer group", nameof(consumerGroup));
3532
}
3633

3734
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
38-
this.adminClientConfig = adminClientConfig ?? throw new ArgumentNullException(nameof(adminClientConfig));
35+
this.adminClientBuilder = adminClientBuilder ?? throw new ArgumentNullException(nameof(adminClientBuilder));
3936
this.consumer = consumer ?? throw new ArgumentNullException(nameof(consumer));
40-
this.topicName = topic;
41-
this.Descriptor = new ScaleMonitorDescriptor($"{functionId}-kafkatrigger-{topicName}-{consumerGroup}".ToLower());
37+
this.topicNames = topics ?? throw new ArgumentNullException(nameof(topics));
38+
this.Descriptor = new ScaleMonitorDescriptor($"{functionId}-kafkatrigger-{string.Join("-", topics)}-{consumerGroup}".ToLower());
4239
this.topicPartitions = new Lazy<List<TopicPartition>>(LoadTopicPartitions);
4340
this.consumerGroup = consumerGroup;
4441
}
@@ -48,27 +45,50 @@ protected virtual List<TopicPartition> LoadTopicPartitions()
4845
try
4946
{
5047
var timeout = TimeSpan.FromSeconds(5);
51-
using var adminClient = new AdminClientBuilder(adminClientConfig).Build();
52-
var metadata = adminClient.GetMetadata(this.topicName, timeout);
53-
if (metadata.Topics == null || metadata.Topics.Count == 0)
48+
using var adminClient = adminClientBuilder.Build();
49+
var topicPartitions = new List<TopicPartition>();
50+
foreach (var topicName in topicNames)
5451
{
55-
logger.LogError("Could not load metadata information about topic '{topic}'", this.topicName);
56-
return new List<TopicPartition>();
57-
}
52+
try
53+
{
54+
List<TopicMetadata> topics;
55+
if (topicName.StartsWith("^"))
56+
{
57+
var metadata = adminClient.GetMetadata(timeout);
58+
topics = metadata.Topics?.Where(x => Regex.IsMatch(x.Topic, topicName)).ToList();
59+
}
60+
else
61+
{
62+
topics = adminClient.GetMetadata(topicName, timeout).Topics;
63+
}
5864

59-
var topicMetadata = metadata.Topics[0];
60-
var partitions = topicMetadata.Partitions;
61-
if (partitions == null || partitions.Count == 0)
62-
{
63-
logger.LogError("Could not load partition information about topic '{topic}'", this.topicName);
64-
return new List<TopicPartition>();
65-
}
65+
if (topics == null || topics.Count == 0)
66+
{
67+
logger.LogError("Could not load metadata information about topic '{topic}'", topicName);
68+
continue;
69+
}
6670

67-
return partitions.Select(x => new TopicPartition(topicMetadata.Topic, new Partition(x.PartitionId))).ToList();
71+
foreach (var topicMetadata in topics)
72+
{
73+
var partitions = topicMetadata.Partitions;
74+
if (partitions == null || partitions.Count == 0)
75+
{
76+
logger.LogError("Could not load partition information about topic '{topic}'", topicName);
77+
continue;
78+
}
79+
topicPartitions.AddRange(partitions.Select(x => new TopicPartition(topicMetadata.Topic, new Partition(x.PartitionId))));
80+
}
81+
}
82+
catch (Exception ex)
83+
{
84+
logger.LogError(ex, "Failed to load partition information from topic '{topic}'", topicName);
85+
}
86+
}
87+
return topicPartitions;
6888
}
6989
catch (Exception ex)
7090
{
71-
logger.LogError(ex, "Failed to load partition information from topic '{topic}'", this.topicName);
91+
logger.LogError(ex, "Failed to load partition information from topics");
7292
}
7393

7494
return new List<TopicPartition>();
@@ -123,7 +143,7 @@ public Task<KafkaTriggerMetrics> GetMetricsAsync()
123143

124144
if (partitionWithHighestLag != Partition.Any)
125145
{
126-
logger.LogInformation("Total lag in '{topic}' is {totalLag}, highest partition lag found in {partition} with value of {offsetDifference}", this.topicName, totalLag, partitionWithHighestLag.Value, highestPartitionLag);
146+
logger.LogInformation("Total lag in '{topic}' is {totalLag}, highest partition lag found in {partition} with value of {offsetDifference}", string.Join(",", topicNames), totalLag, partitionWithHighestLag.Value, highestPartitionLag);
127147
}
128148

129149
return Task.FromResult(new KafkaTriggerMetrics(totalLag, allPartitions.Count));
@@ -167,8 +187,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me
167187

168188
if (this.logger.IsEnabled(LogLevel.Information))
169189
{
170-
this.logger.LogInformation("WorkerCount ({workerCount}) > PartitionCount ({partitionCount}). For topic {topicName}, for consumer group {consumerGroup}.", workerCount, partitionCount, this.topicName, this.consumerGroup);
171-
this.logger.LogInformation("Number of instances ({workerCount}) is too high relative to number of partitions ({partitionCount}). For topic {topicName}, for consumer group {consumerGroup}.", workerCount, partitionCount, this.topicName, this.consumerGroup);
190+
this.logger.LogInformation("WorkerCount ({workerCount}) > PartitionCount ({partitionCount}). For topic {topicName}, for consumer group {consumerGroup}.", workerCount, partitionCount, string.Join(",", topicNames), this.consumerGroup);
191+
this.logger.LogInformation("Number of instances ({workerCount}) is too high relative to number of partitions ({partitionCount}). For topic {topicName}, for consumer group {consumerGroup}.", workerCount, partitionCount, string.Join(",", topicNames), this.consumerGroup);
172192
}
173193

174194
return status;
@@ -182,7 +202,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me
182202
status.Vote = ScaleVote.ScaleIn;
183203
if (this.logger.IsEnabled(LogLevel.Information))
184204
{
185-
this.logger.LogInformation("Topic '{topicName}', for consumer group {consumerGroup}' is idle.", this.topicName, this.consumerGroup);
205+
this.logger.LogInformation("Topic '{topicName}', for consumer group {consumerGroup}' is idle.", string.Join(",", topicNames), this.consumerGroup);
186206
}
187207

188208
return status;
@@ -192,17 +212,17 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me
192212
if (totalLag > workerCount * lagThreshold)
193213
{
194214
if (workerCount < partitionCount)
195-
{
215+
{
196216
status.Vote = ScaleVote.ScaleOut;
197217

198218
if (this.logger.IsEnabled(LogLevel.Information))
199219
{
200-
this.logger.LogInformation("Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.", totalLag, workerCount, topicName, consumerGroup);
220+
this.logger.LogInformation("Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.", totalLag, workerCount, string.Join(",", topicNames), consumerGroup);
201221
}
202222
}
203223
return status;
204224
}
205-
225+
206226
// Samples are in chronological order. Check for a continuous increase in unprocessed message count.
207227
// If detected, this results in an automatic scale out for the site container.
208228
if (metrics[0].TotalLag > 0)
@@ -220,13 +240,13 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me
220240

221241
if (this.logger.IsEnabled(LogLevel.Information))
222242
{
223-
this.logger.LogInformation("Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.", totalLag, workerCount, topicName, consumerGroup);
243+
this.logger.LogInformation("Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.", totalLag, workerCount, string.Join(",", topicNames), consumerGroup);
224244
}
225245
return status;
226246
}
227247
}
228248
}
229-
249+
230250
if (workerCount > 1)
231251
{
232252
bool queueLengthDecreasing = IsTrueForLast(
@@ -246,12 +266,12 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me
246266

247267
if (this.logger.IsEnabled(LogLevel.Information))
248268
{
249-
this.logger.LogInformation("Total lag length is decreasing for topic {topicName}, for consumer group {consumerGroup}.", this.topicName, this.consumerGroup);
250-
}
251-
}
269+
this.logger.LogInformation("Total lag length is decreasing for topic {topicName}, for consumer group {consumerGroup}.", string.Join(",", topicNames), this.consumerGroup);
270+
}
271+
}
252272
}
253273
}
254-
274+
255275
return status;
256276
}
257277

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaListenerConfiguration.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using Confluent.Kafka;
5+
using System.Collections.Generic;
56

67
namespace Microsoft.Azure.WebJobs.Extensions.Kafka
78
{
@@ -65,10 +66,10 @@ public class KafkaListenerConfiguration
6566
public string EventHubConnectionString { get; set; }
6667

6768
/// <summary>
68-
/// Gets or sets the topic.
69+
/// Gets or sets the topics.
6970
/// </summary>
70-
/// <value>The topic.</value>
71-
public string Topic { get; set; }
71+
/// <value>The topics.</value>
72+
public IList<string> Topics { get; set; } = new List<string>();
7273

7374
/// <summary>
7475
/// Gets or sets the consumer group.

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttribute.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka
1616
public class KafkaTriggerAttribute : Attribute
1717
{
1818

19-
public KafkaTriggerAttribute(string brokerList, string topic)
19+
public KafkaTriggerAttribute(string brokerList, params string[] topic)
2020
{
2121
this.BrokerList = brokerList;
22-
this.Topic = topic;
22+
this.Topics = topic;
2323
}
2424

2525
/// <summary>
2626
/// Gets or sets the topic
2727
/// </summary>
28-
public string Topic { get; private set; }
28+
public string[] Topics { get; private set; }
2929

3030
/// <summary>
3131
/// Gets or sets the broker list

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System;
5+
using System.Linq;
56
using System.Reflection;
67
using System.Threading.Tasks;
78
using Confluent.Kafka;
@@ -36,7 +37,7 @@ public KafkaTriggerAttributeBindingProvider(
3637
this.nameResolver = nameResolver;
3738
this.options = options;
3839
this.logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Kafka"));
39-
}
40+
}
4041

4142
public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext context)
4243
{
@@ -50,7 +51,7 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex
5051
var consumerConfig = CreateConsumerConfiguration(attribute);
5152

5253
var keyAndValueTypes = SerializationHelper.GetKeyAndValueTypes(attribute.AvroSchema, parameter.ParameterType, typeof(Ignore));
53-
var valueDeserializer = SerializationHelper.ResolveValueDeserializer(keyAndValueTypes.ValueType, keyAndValueTypes.AvroSchema);
54+
var valueDeserializer = SerializationHelper.ResolveValueDeserializer(keyAndValueTypes.ValueType, keyAndValueTypes.AvroSchema);
5455

5556
var binding = CreateBindingStrategyFor(keyAndValueTypes.KeyType ?? typeof(Ignore), keyAndValueTypes.ValueType, keyAndValueTypes.RequiresKey, valueDeserializer, parameter, consumerConfig);
5657

@@ -77,7 +78,7 @@ Task<IListener> listenerCreator(ListenerFactoryContext factoryContext, bool sing
7778
valueDeserializer,
7879
this.logger,
7980
factoryContext.Descriptor.Id);
80-
81+
8182
return Task.FromResult<IListener>(listener);
8283
}
8384

@@ -90,11 +91,11 @@ private KafkaListenerConfiguration CreateConsumerConfiguration(KafkaTriggerAttri
9091
{
9192
BrokerList = this.config.ResolveSecureSetting(nameResolver, attribute.BrokerList),
9293
ConsumerGroup = this.config.ResolveSecureSetting(nameResolver, attribute.ConsumerGroup),
93-
Topic = this.config.ResolveSecureSetting(nameResolver, attribute.Topic),
94+
Topics = attribute.Topics.Select(x => this.config.ResolveSecureSetting(nameResolver, x)).ToList(),
9495
EventHubConnectionString = this.config.ResolveSecureSetting(nameResolver, attribute.EventHubConnectionString),
9596
};
9697

97-
if (attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet ||
98+
if (attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet ||
9899
attribute.Protocol != BrokerProtocol.NotSet)
99100
{
100101
consumerConfig.SaslPassword = this.config.ResolveSecureSetting(nameResolver, attribute.Password);

0 commit comments

Comments
 (0)