Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ internal class KafkaListener<TKey, TValue> : IListener, IScaleMonitorProvider
private CancellationTokenSource cancellationTokenSource;
private SemaphoreSlim subscriberFinished;
private readonly string consumerGroup;
private readonly string topicName;
private readonly IList<string> topicList;
private readonly string functionId;
//protected for the unit test
protected Lazy<KafkaTopicScaler<TKey, TValue>> topicScaler;
Expand Down Expand Up @@ -74,7 +74,7 @@ public KafkaListener(
this.logger = logger;
this.cancellationTokenSource = new CancellationTokenSource();
this.consumerGroup = string.IsNullOrEmpty(this.listenerConfiguration.ConsumerGroup) ? "$Default" : this.listenerConfiguration.ConsumerGroup;
this.topicName = this.listenerConfiguration.Topic;
this.topicList = this.listenerConfiguration.Topics;
this.functionId = functionId;
this.consumer = new Lazy<IConsumer<TKey, TValue>>(() => CreateConsumer());
this.topicScaler = new Lazy<KafkaTopicScaler<TKey, TValue>>(() => CreateTopicScaler());
Expand Down Expand Up @@ -114,7 +114,7 @@ private IConsumer<TKey, TValue> CreateConsumer()

private KafkaTopicScaler<TKey, TValue> CreateTopicScaler()
{
return new KafkaTopicScaler<TKey, TValue>(this.listenerConfiguration.Topic, this.consumerGroup, this.functionId, this.consumer.Value, new AdminClientConfig(GetConsumerConfiguration()), this.logger);
return new KafkaTopicScaler<TKey, TValue>(topicList.ToList(), this.consumerGroup, this.functionId, this.consumer.Value, new AdminClientBuilder(new AdminClientConfig(GetConsumerConfiguration())), this.logger);
}

public void Cancel()
Expand All @@ -131,7 +131,8 @@ public Task StartAsync(CancellationToken cancellationToken)
(FunctionExecutorBase<TKey, TValue>)new SingleItemFunctionExecutor<TKey, TValue>(executor, localConsumer, this.options.ExecutorChannelCapacity, this.options.ChannelFullRetryIntervalInMs, commitStrategy, logger) :
new MultipleItemFunctionExecutor<TKey, TValue>(executor, localConsumer, this.options.ExecutorChannelCapacity, this.options.ChannelFullRetryIntervalInMs, commitStrategy, logger);

localConsumer.Subscribe(this.listenerConfiguration.Topic);
localConsumer.Subscribe(this.listenerConfiguration.Topics);

// Using a thread as opposed to a task since this will be long running
var thread = new Thread(ProcessSubscription)
{
Expand Down Expand Up @@ -325,7 +326,7 @@ private void ProcessSubscription(object parameter)
}
finally
{
this.logger.LogInformation("Exiting {processName} for {topic}", nameof(ProcessSubscription), this.listenerConfiguration.Topic);
this.logger.LogInformation("Exiting {processName} for {topic}", nameof(ProcessSubscription), this.listenerConfiguration.Topics);
this.subscriberFinished.Release();
}
}
Expand Down Expand Up @@ -382,7 +383,7 @@ protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
this.logger.LogInformation("Disposing Kafka Listener for {topic}", this.listenerConfiguration.Topic);
this.logger.LogInformation("Disposing Kafka Listener for {topic}", this.listenerConfiguration.Topics);

if (disposing)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,37 @@
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading.Tasks;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
public class KafkaTopicScaler<TKey, TValue> : IScaleMonitor<KafkaTriggerMetrics>
{
private readonly string topicName;
private readonly string consumerGroup;
private readonly ILogger logger;
private readonly AdminClientConfig adminClientConfig;
private readonly AdminClientBuilder adminClientBuilder;
private readonly IConsumer<TKey, TValue> consumer;
private readonly IReadOnlyList<string> topicNames;
private readonly Lazy<List<TopicPartition>> topicPartitions;

public ScaleMonitorDescriptor Descriptor { get; }

public KafkaTopicScaler(string topic, string consumerGroup, string functionId, IConsumer<TKey, TValue> consumer, AdminClientConfig adminClientConfig, ILogger logger)
public KafkaTopicScaler(IReadOnlyList<string> topics, string consumerGroup, string functionId, IConsumer<TKey, TValue> consumer, AdminClientBuilder adminClientBuilder, ILogger logger)
{
if (string.IsNullOrWhiteSpace(topic))
{
throw new ArgumentException("Invalid topic", nameof(topic));
}

if (string.IsNullOrWhiteSpace(consumerGroup))
{
throw new ArgumentException("Invalid consumer group", nameof(consumerGroup));
}

this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.adminClientConfig = adminClientConfig ?? throw new ArgumentNullException(nameof(adminClientConfig));
this.adminClientBuilder = adminClientBuilder ?? throw new ArgumentNullException(nameof(adminClientBuilder));
this.consumer = consumer ?? throw new ArgumentNullException(nameof(consumer));
this.topicName = topic;
this.Descriptor = new ScaleMonitorDescriptor($"{functionId}-kafkatrigger-{topicName}-{consumerGroup}".ToLower());
this.topicNames = topics ?? throw new ArgumentNullException(nameof(topics));
this.Descriptor = new ScaleMonitorDescriptor($"{functionId}-kafkatrigger-{string.Join("-", topics)}-{consumerGroup}".ToLower());
this.topicPartitions = new Lazy<List<TopicPartition>>(LoadTopicPartitions);
this.consumerGroup = consumerGroup;
}
Expand All @@ -48,27 +45,50 @@ protected virtual List<TopicPartition> LoadTopicPartitions()
try
{
var timeout = TimeSpan.FromSeconds(5);
using var adminClient = new AdminClientBuilder(adminClientConfig).Build();
var metadata = adminClient.GetMetadata(this.topicName, timeout);
if (metadata.Topics == null || metadata.Topics.Count == 0)
using var adminClient = adminClientBuilder.Build();
var topicPartitions = new List<TopicPartition>();
foreach (var topicName in topicNames)
{
logger.LogError("Could not load metadata information about topic '{topic}'", this.topicName);
return new List<TopicPartition>();
}
try
{
List<TopicMetadata> topics;
if (topicName.StartsWith("^"))
{
var metadata = adminClient.GetMetadata(timeout);
topics = metadata.Topics?.Where(x => Regex.IsMatch(x.Topic, topicName)).ToList();
}
else
{
topics = adminClient.GetMetadata(topicName, timeout).Topics;
}

var topicMetadata = metadata.Topics[0];
var partitions = topicMetadata.Partitions;
if (partitions == null || partitions.Count == 0)
{
logger.LogError("Could not load partition information about topic '{topic}'", this.topicName);
return new List<TopicPartition>();
}
if (topics == null || topics.Count == 0)
{
logger.LogError("Could not load metadata information about topic '{topic}'", topicName);
continue;
}

return partitions.Select(x => new TopicPartition(topicMetadata.Topic, new Partition(x.PartitionId))).ToList();
foreach (var topicMetadata in topics)
{
var partitions = topicMetadata.Partitions;
if (partitions == null || partitions.Count == 0)
{
logger.LogError("Could not load partition information about topic '{topic}'", topicName);
continue;
}
topicPartitions.AddRange(partitions.Select(x => new TopicPartition(topicMetadata.Topic, new Partition(x.PartitionId))));
}
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to load partition information from topic '{topic}'", topicName);
}
}
return topicPartitions;
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to load partition information from topic '{topic}'", this.topicName);
logger.LogError(ex, "Failed to load partition information from topics");
}

return new List<TopicPartition>();
Expand Down Expand Up @@ -123,7 +143,7 @@ public Task<KafkaTriggerMetrics> GetMetricsAsync()

if (partitionWithHighestLag != Partition.Any)
{
logger.LogInformation("Total lag in '{topic}' is {totalLag}, highest partition lag found in {partition} with value of {offsetDifference}", this.topicName, totalLag, partitionWithHighestLag.Value, highestPartitionLag);
logger.LogInformation("Total lag in '{topic}' is {totalLag}, highest partition lag found in {partition} with value of {offsetDifference}", string.Join(",", topicNames), totalLag, partitionWithHighestLag.Value, highestPartitionLag);
}

return Task.FromResult(new KafkaTriggerMetrics(totalLag, allPartitions.Count));
Expand Down Expand Up @@ -167,8 +187,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me

if (this.logger.IsEnabled(LogLevel.Information))
{
this.logger.LogInformation("WorkerCount ({workerCount}) > PartitionCount ({partitionCount}). For topic {topicName}, for consumer group {consumerGroup}.", workerCount, partitionCount, this.topicName, this.consumerGroup);
this.logger.LogInformation("Number of instances ({workerCount}) is too high relative to number of partitions ({partitionCount}). For topic {topicName}, for consumer group {consumerGroup}.", workerCount, partitionCount, this.topicName, this.consumerGroup);
this.logger.LogInformation("WorkerCount ({workerCount}) > PartitionCount ({partitionCount}). For topic {topicName}, for consumer group {consumerGroup}.", workerCount, partitionCount, string.Join(",", topicNames), this.consumerGroup);
this.logger.LogInformation("Number of instances ({workerCount}) is too high relative to number of partitions ({partitionCount}). For topic {topicName}, for consumer group {consumerGroup}.", workerCount, partitionCount, string.Join(",", topicNames), this.consumerGroup);
}

return status;
Expand All @@ -182,7 +202,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me
status.Vote = ScaleVote.ScaleIn;
if (this.logger.IsEnabled(LogLevel.Information))
{
this.logger.LogInformation("Topic '{topicName}', for consumer group {consumerGroup}' is idle.", this.topicName, this.consumerGroup);
this.logger.LogInformation("Topic '{topicName}', for consumer group {consumerGroup}' is idle.", string.Join(",", topicNames), this.consumerGroup);
}

return status;
Expand All @@ -192,17 +212,17 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me
if (totalLag > workerCount * lagThreshold)
{
if (workerCount < partitionCount)
{
{
status.Vote = ScaleVote.ScaleOut;

if (this.logger.IsEnabled(LogLevel.Information))
{
this.logger.LogInformation("Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.", totalLag, workerCount, topicName, consumerGroup);
this.logger.LogInformation("Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.", totalLag, workerCount, string.Join(",", topicNames), consumerGroup);
}
}
return status;
}

// Samples are in chronological order. Check for a continuous increase in unprocessed message count.
// If detected, this results in an automatic scale out for the site container.
if (metrics[0].TotalLag > 0)
Expand All @@ -220,13 +240,13 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me

if (this.logger.IsEnabled(LogLevel.Information))
{
this.logger.LogInformation("Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.", totalLag, workerCount, topicName, consumerGroup);
this.logger.LogInformation("Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.", totalLag, workerCount, string.Join(",", topicNames), consumerGroup);
}
return status;
}
}
}

if (workerCount > 1)
{
bool queueLengthDecreasing = IsTrueForLast(
Expand All @@ -246,12 +266,12 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me

if (this.logger.IsEnabled(LogLevel.Information))
{
this.logger.LogInformation("Total lag length is decreasing for topic {topicName}, for consumer group {consumerGroup}.", this.topicName, this.consumerGroup);
}
}
this.logger.LogInformation("Total lag length is decreasing for topic {topicName}, for consumer group {consumerGroup}.", string.Join(",", topicNames), this.consumerGroup);
}
}
}
}

return status;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using Confluent.Kafka;
using System.Collections.Generic;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
Expand Down Expand Up @@ -65,10 +66,10 @@ public class KafkaListenerConfiguration
public string EventHubConnectionString { get; set; }

/// <summary>
/// Gets or sets the topic.
/// Gets or sets the topics.
/// </summary>
/// <value>The topic.</value>
public string Topic { get; set; }
/// <value>The topics.</value>
public IList<string> Topics { get; set; } = new List<string>();

/// <summary>
/// Gets or sets the consumer group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka
public class KafkaTriggerAttribute : Attribute
{

public KafkaTriggerAttribute(string brokerList, string topic)
public KafkaTriggerAttribute(string brokerList, params string[] topic)
{
this.BrokerList = brokerList;
this.Topic = topic;
this.Topics = topic;
}

/// <summary>
/// Gets or sets the topic
/// </summary>
public string Topic { get; private set; }
public string[] Topics { get; private set; }

/// <summary>
/// Gets or sets the broker list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Confluent.Kafka;
Expand Down Expand Up @@ -36,7 +37,7 @@ public KafkaTriggerAttributeBindingProvider(
this.nameResolver = nameResolver;
this.options = options;
this.logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Kafka"));
}
}

public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext context)
{
Expand All @@ -50,7 +51,7 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex
var consumerConfig = CreateConsumerConfiguration(attribute);

var keyAndValueTypes = SerializationHelper.GetKeyAndValueTypes(attribute.AvroSchema, parameter.ParameterType, typeof(Ignore));
var valueDeserializer = SerializationHelper.ResolveValueDeserializer(keyAndValueTypes.ValueType, keyAndValueTypes.AvroSchema);
var valueDeserializer = SerializationHelper.ResolveValueDeserializer(keyAndValueTypes.ValueType, keyAndValueTypes.AvroSchema);

var binding = CreateBindingStrategyFor(keyAndValueTypes.KeyType ?? typeof(Ignore), keyAndValueTypes.ValueType, keyAndValueTypes.RequiresKey, valueDeserializer, parameter, consumerConfig);

Expand All @@ -77,7 +78,7 @@ Task<IListener> listenerCreator(ListenerFactoryContext factoryContext, bool sing
valueDeserializer,
this.logger,
factoryContext.Descriptor.Id);

return Task.FromResult<IListener>(listener);
}

Expand All @@ -90,11 +91,11 @@ private KafkaListenerConfiguration CreateConsumerConfiguration(KafkaTriggerAttri
{
BrokerList = this.config.ResolveSecureSetting(nameResolver, attribute.BrokerList),
ConsumerGroup = this.config.ResolveSecureSetting(nameResolver, attribute.ConsumerGroup),
Topic = this.config.ResolveSecureSetting(nameResolver, attribute.Topic),
Topics = attribute.Topics.Select(x => this.config.ResolveSecureSetting(nameResolver, x)).ToList(),
EventHubConnectionString = this.config.ResolveSecureSetting(nameResolver, attribute.EventHubConnectionString),
};

if (attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet ||
if (attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet ||
attribute.Protocol != BrokerProtocol.NotSet)
{
consumerConfig.SaslPassword = this.config.ResolveSecureSetting(nameResolver, attribute.Password);
Expand Down
Loading