diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs index 3511fd36..82d47d93 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs @@ -44,7 +44,7 @@ internal class KafkaListener : IListener, IScaleMonitorProvider private CancellationTokenSource cancellationTokenSource; private SemaphoreSlim subscriberFinished; private readonly string consumerGroup; - private readonly string topicName; + private readonly IList topicList; private readonly string functionId; //protected for the unit test protected Lazy> topicScaler; @@ -74,7 +74,7 @@ public KafkaListener( this.logger = logger; this.cancellationTokenSource = new CancellationTokenSource(); this.consumerGroup = string.IsNullOrEmpty(this.listenerConfiguration.ConsumerGroup) ? "$Default" : this.listenerConfiguration.ConsumerGroup; - this.topicName = this.listenerConfiguration.Topic; + this.topicList = this.listenerConfiguration.Topics; this.functionId = functionId; this.consumer = new Lazy>(() => CreateConsumer()); this.topicScaler = new Lazy>(() => CreateTopicScaler()); @@ -114,7 +114,7 @@ private IConsumer CreateConsumer() private KafkaTopicScaler CreateTopicScaler() { - return new KafkaTopicScaler(this.listenerConfiguration.Topic, this.consumerGroup, this.functionId, this.consumer.Value, new AdminClientConfig(GetConsumerConfiguration()), this.logger); + return new KafkaTopicScaler(topicList.ToList(), this.consumerGroup, this.functionId, this.consumer.Value, new AdminClientBuilder(new AdminClientConfig(GetConsumerConfiguration())), this.logger); } public void Cancel() @@ -131,7 +131,8 @@ public Task StartAsync(CancellationToken cancellationToken) (FunctionExecutorBase)new SingleItemFunctionExecutor(executor, localConsumer, this.options.ExecutorChannelCapacity, this.options.ChannelFullRetryIntervalInMs, commitStrategy, logger) : new MultipleItemFunctionExecutor(executor, localConsumer, this.options.ExecutorChannelCapacity, this.options.ChannelFullRetryIntervalInMs, commitStrategy, logger); - localConsumer.Subscribe(this.listenerConfiguration.Topic); + localConsumer.Subscribe(this.listenerConfiguration.Topics); + // Using a thread as opposed to a task since this will be long running var thread = new Thread(ProcessSubscription) { @@ -325,7 +326,7 @@ private void ProcessSubscription(object parameter) } finally { - this.logger.LogInformation("Exiting {processName} for {topic}", nameof(ProcessSubscription), this.listenerConfiguration.Topic); + this.logger.LogInformation("Exiting {processName} for {topic}", nameof(ProcessSubscription), this.listenerConfiguration.Topics); this.subscriberFinished.Release(); } } @@ -382,7 +383,7 @@ protected virtual void Dispose(bool disposing) { if (!disposed) { - this.logger.LogInformation("Disposing Kafka Listener for {topic}", this.listenerConfiguration.Topic); + this.logger.LogInformation("Disposing Kafka Listener for {topic}", this.listenerConfiguration.Topics); if (disposing) { diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs index 84887df3..a7d6a1ba 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs @@ -5,40 +5,37 @@ using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.Logging; using System; +using System.Collections; using System.Collections.Generic; using System.Linq; +using System.Text.RegularExpressions; using System.Threading.Tasks; namespace Microsoft.Azure.WebJobs.Extensions.Kafka { public class KafkaTopicScaler : IScaleMonitor { - private readonly string topicName; private readonly string consumerGroup; private readonly ILogger logger; - private readonly AdminClientConfig adminClientConfig; + private readonly AdminClientBuilder adminClientBuilder; private readonly IConsumer consumer; + private readonly IReadOnlyList topicNames; private readonly Lazy> topicPartitions; public ScaleMonitorDescriptor Descriptor { get; } - public KafkaTopicScaler(string topic, string consumerGroup, string functionId, IConsumer consumer, AdminClientConfig adminClientConfig, ILogger logger) + public KafkaTopicScaler(IReadOnlyList topics, string consumerGroup, string functionId, IConsumer consumer, AdminClientBuilder adminClientBuilder, ILogger logger) { - if (string.IsNullOrWhiteSpace(topic)) - { - throw new ArgumentException("Invalid topic", nameof(topic)); - } - if (string.IsNullOrWhiteSpace(consumerGroup)) { throw new ArgumentException("Invalid consumer group", nameof(consumerGroup)); } this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); - this.adminClientConfig = adminClientConfig ?? throw new ArgumentNullException(nameof(adminClientConfig)); + this.adminClientBuilder = adminClientBuilder ?? throw new ArgumentNullException(nameof(adminClientBuilder)); this.consumer = consumer ?? throw new ArgumentNullException(nameof(consumer)); - this.topicName = topic; - this.Descriptor = new ScaleMonitorDescriptor($"{functionId}-kafkatrigger-{topicName}-{consumerGroup}".ToLower()); + this.topicNames = topics ?? throw new ArgumentNullException(nameof(topics)); + this.Descriptor = new ScaleMonitorDescriptor($"{functionId}-kafkatrigger-{string.Join("-", topics)}-{consumerGroup}".ToLower()); this.topicPartitions = new Lazy>(LoadTopicPartitions); this.consumerGroup = consumerGroup; } @@ -48,27 +45,50 @@ protected virtual List LoadTopicPartitions() try { var timeout = TimeSpan.FromSeconds(5); - using var adminClient = new AdminClientBuilder(adminClientConfig).Build(); - var metadata = adminClient.GetMetadata(this.topicName, timeout); - if (metadata.Topics == null || metadata.Topics.Count == 0) + using var adminClient = adminClientBuilder.Build(); + var topicPartitions = new List(); + foreach (var topicName in topicNames) { - logger.LogError("Could not load metadata information about topic '{topic}'", this.topicName); - return new List(); - } + try + { + List topics; + if (topicName.StartsWith("^")) + { + var metadata = adminClient.GetMetadata(timeout); + topics = metadata.Topics?.Where(x => Regex.IsMatch(x.Topic, topicName)).ToList(); + } + else + { + topics = adminClient.GetMetadata(topicName, timeout).Topics; + } - var topicMetadata = metadata.Topics[0]; - var partitions = topicMetadata.Partitions; - if (partitions == null || partitions.Count == 0) - { - logger.LogError("Could not load partition information about topic '{topic}'", this.topicName); - return new List(); - } + if (topics == null || topics.Count == 0) + { + logger.LogError("Could not load metadata information about topic '{topic}'", topicName); + continue; + } - return partitions.Select(x => new TopicPartition(topicMetadata.Topic, new Partition(x.PartitionId))).ToList(); + foreach (var topicMetadata in topics) + { + var partitions = topicMetadata.Partitions; + if (partitions == null || partitions.Count == 0) + { + logger.LogError("Could not load partition information about topic '{topic}'", topicName); + continue; + } + topicPartitions.AddRange(partitions.Select(x => new TopicPartition(topicMetadata.Topic, new Partition(x.PartitionId)))); + } + } + catch (Exception ex) + { + logger.LogError(ex, "Failed to load partition information from topic '{topic}'", topicName); + } + } + return topicPartitions; } catch (Exception ex) { - logger.LogError(ex, "Failed to load partition information from topic '{topic}'", this.topicName); + logger.LogError(ex, "Failed to load partition information from topics"); } return new List(); @@ -123,7 +143,7 @@ public Task GetMetricsAsync() if (partitionWithHighestLag != Partition.Any) { - logger.LogInformation("Total lag in '{topic}' is {totalLag}, highest partition lag found in {partition} with value of {offsetDifference}", this.topicName, totalLag, partitionWithHighestLag.Value, highestPartitionLag); + logger.LogInformation("Total lag in '{topic}' is {totalLag}, highest partition lag found in {partition} with value of {offsetDifference}", string.Join(",", topicNames), totalLag, partitionWithHighestLag.Value, highestPartitionLag); } return Task.FromResult(new KafkaTriggerMetrics(totalLag, allPartitions.Count)); @@ -167,8 +187,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me if (this.logger.IsEnabled(LogLevel.Information)) { - this.logger.LogInformation("WorkerCount ({workerCount}) > PartitionCount ({partitionCount}). For topic {topicName}, for consumer group {consumerGroup}.", workerCount, partitionCount, this.topicName, this.consumerGroup); - this.logger.LogInformation("Number of instances ({workerCount}) is too high relative to number of partitions ({partitionCount}). For topic {topicName}, for consumer group {consumerGroup}.", workerCount, partitionCount, this.topicName, this.consumerGroup); + this.logger.LogInformation("WorkerCount ({workerCount}) > PartitionCount ({partitionCount}). For topic {topicName}, for consumer group {consumerGroup}.", workerCount, partitionCount, string.Join(",", topicNames), this.consumerGroup); + this.logger.LogInformation("Number of instances ({workerCount}) is too high relative to number of partitions ({partitionCount}). For topic {topicName}, for consumer group {consumerGroup}.", workerCount, partitionCount, string.Join(",", topicNames), this.consumerGroup); } return status; @@ -182,7 +202,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me status.Vote = ScaleVote.ScaleIn; if (this.logger.IsEnabled(LogLevel.Information)) { - this.logger.LogInformation("Topic '{topicName}', for consumer group {consumerGroup}' is idle.", this.topicName, this.consumerGroup); + this.logger.LogInformation("Topic '{topicName}', for consumer group {consumerGroup}' is idle.", string.Join(",", topicNames), this.consumerGroup); } return status; @@ -192,17 +212,17 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me if (totalLag > workerCount * lagThreshold) { if (workerCount < partitionCount) - { + { status.Vote = ScaleVote.ScaleOut; if (this.logger.IsEnabled(LogLevel.Information)) { - this.logger.LogInformation("Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.", totalLag, workerCount, topicName, consumerGroup); + this.logger.LogInformation("Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.", totalLag, workerCount, string.Join(",", topicNames), consumerGroup); } } return status; } - + // Samples are in chronological order. Check for a continuous increase in unprocessed message count. // If detected, this results in an automatic scale out for the site container. if (metrics[0].TotalLag > 0) @@ -220,13 +240,13 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me if (this.logger.IsEnabled(LogLevel.Information)) { - this.logger.LogInformation("Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.", totalLag, workerCount, topicName, consumerGroup); + this.logger.LogInformation("Total lag ({totalLag}) is less than the number of instances ({workerCount}). Scale out, for topic {topicName}, for consumer group {consumerGroup}.", totalLag, workerCount, string.Join(",", topicNames), consumerGroup); } return status; } } } - + if (workerCount > 1) { bool queueLengthDecreasing = IsTrueForLast( @@ -246,12 +266,12 @@ private ScaleStatus GetScaleStatusCore(int workerCount, KafkaTriggerMetrics[] me if (this.logger.IsEnabled(LogLevel.Information)) { - this.logger.LogInformation("Total lag length is decreasing for topic {topicName}, for consumer group {consumerGroup}.", this.topicName, this.consumerGroup); - } - } + this.logger.LogInformation("Total lag length is decreasing for topic {topicName}, for consumer group {consumerGroup}.", string.Join(",", topicNames), this.consumerGroup); + } + } } } - + return status; } diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaListenerConfiguration.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaListenerConfiguration.cs index 63e69cfe..e54b6224 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaListenerConfiguration.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaListenerConfiguration.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using Confluent.Kafka; +using System.Collections.Generic; namespace Microsoft.Azure.WebJobs.Extensions.Kafka { @@ -65,10 +66,10 @@ public class KafkaListenerConfiguration public string EventHubConnectionString { get; set; } /// - /// Gets or sets the topic. + /// Gets or sets the topics. /// - /// The topic. - public string Topic { get; set; } + /// The topics. + public IList Topics { get; set; } = new List(); /// /// Gets or sets the consumer group. diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttribute.cs index e8cb85cf..b540fbce 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttribute.cs @@ -16,16 +16,16 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka public class KafkaTriggerAttribute : Attribute { - public KafkaTriggerAttribute(string brokerList, string topic) + public KafkaTriggerAttribute(string brokerList, params string[] topic) { this.BrokerList = brokerList; - this.Topic = topic; + this.Topics = topic; } /// /// Gets or sets the topic /// - public string Topic { get; private set; } + public string[] Topics { get; private set; } /// /// Gets or sets the broker list diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs index 22218234..228837bb 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Linq; using System.Reflection; using System.Threading.Tasks; using Confluent.Kafka; @@ -36,7 +37,7 @@ public KafkaTriggerAttributeBindingProvider( this.nameResolver = nameResolver; this.options = options; this.logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Kafka")); - } + } public Task TryCreateAsync(TriggerBindingProviderContext context) { @@ -50,7 +51,7 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex var consumerConfig = CreateConsumerConfiguration(attribute); var keyAndValueTypes = SerializationHelper.GetKeyAndValueTypes(attribute.AvroSchema, parameter.ParameterType, typeof(Ignore)); - var valueDeserializer = SerializationHelper.ResolveValueDeserializer(keyAndValueTypes.ValueType, keyAndValueTypes.AvroSchema); + var valueDeserializer = SerializationHelper.ResolveValueDeserializer(keyAndValueTypes.ValueType, keyAndValueTypes.AvroSchema); var binding = CreateBindingStrategyFor(keyAndValueTypes.KeyType ?? typeof(Ignore), keyAndValueTypes.ValueType, keyAndValueTypes.RequiresKey, valueDeserializer, parameter, consumerConfig); @@ -77,7 +78,7 @@ Task listenerCreator(ListenerFactoryContext factoryContext, bool sing valueDeserializer, this.logger, factoryContext.Descriptor.Id); - + return Task.FromResult(listener); } @@ -90,11 +91,11 @@ private KafkaListenerConfiguration CreateConsumerConfiguration(KafkaTriggerAttri { BrokerList = this.config.ResolveSecureSetting(nameResolver, attribute.BrokerList), ConsumerGroup = this.config.ResolveSecureSetting(nameResolver, attribute.ConsumerGroup), - Topic = this.config.ResolveSecureSetting(nameResolver, attribute.Topic), + Topics = attribute.Topics.Select(x => this.config.ResolveSecureSetting(nameResolver, x)).ToList(), EventHubConnectionString = this.config.ResolveSecureSetting(nameResolver, attribute.EventHubConnectionString), }; - if (attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet || + if (attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet || attribute.Protocol != BrokerProtocol.NotSet) { consumerConfig.SaslPassword = this.config.ResolveSecureSetting(nameResolver, attribute.Password); diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaEndToEndTests.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaEndToEndTests.cs index 98fef5df..37c14df3 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaEndToEndTests.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaEndToEndTests.cs @@ -535,7 +535,15 @@ await TestHelpers.Await(() => nameof(KafkaOutputFunctions.Produce_Return_Parameter_Raw_Protobuf_Without_Key), typeof(MultiItem_Raw_Protobuf_Trigger), Constants.MyProtobufTopicName)] - public async Task Produce_And_Consume_Without_Key(string producerFunctionName, Type triggerFunctionType, string topicName) + [InlineData( + nameof(KafkaOutputFunctions.Produce_Out_Parameter_KafkaEventData_Array_String_Without_Key), + typeof(MultiTopic_SingleItem_Raw_String_Without_Key_Trigger), + Constants.StringTopicWithTenPartitionsName + "," + Constants.StringTopicWithOnePartitionName)] + [InlineData( + nameof(KafkaOutputFunctions.Produce_Out_Parameter_KafkaEventData_Array_String_Without_Key), + typeof(RegexTopic_SingleItem_Raw_String_Without_Key_Trigger), + Constants.StringTopicWithTenPartitionsName + "," + Constants.StringTopicWithOnePartitionName)] + public async Task Produce_And_Consume_Without_Key(string producerFunctionName, Type triggerFunctionType, string topicNames) { const int producedMessagesCount = 20; var messagePrefix = Guid.NewGuid().ToString() + ":"; @@ -544,18 +552,21 @@ public async Task Produce_And_Consume_Without_Key(string producerFunctionName, T { var jobHost = host.GetJobHost(); - await jobHost.CallOutputTriggerStringAsync( - GetStaticMethod(typeof(KafkaOutputFunctions), producerFunctionName), - topicName, - Enumerable.Range(1, producedMessagesCount).Select(x => messagePrefix + x) - ); - - await TestHelpers.Await(() => + foreach (var topicName in topicNames.Split(',')) { - var foundCount = loggerProvider.GetAllUserLogMessages().Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains(messagePrefix)); - return foundCount == producedMessagesCount; - }); + await jobHost.CallOutputTriggerStringAsync( + GetStaticMethod(typeof(KafkaOutputFunctions), producerFunctionName), + topicName, + Enumerable.Range(1, producedMessagesCount).Select(x => messagePrefix + x) + ); + + await TestHelpers.Await(() => + { + var foundCount = loggerProvider.GetAllUserLogMessages().Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains(messagePrefix)); + return foundCount == producedMessagesCount; + }); + } // Give time for the commit to be saved await Task.Delay(1000); } diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs index dd21baf9..a0bc9a8c 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs @@ -45,6 +45,26 @@ public static void Trigger( } } + internal static class MultiTopic_SingleItem_Raw_String_Without_Key_Trigger + { + public static void Trigger( + [KafkaTrigger("LocalBroker", new[] { Constants.StringTopicWithTenPartitionsName, Constants.StringTopicWithOnePartitionName}, ConsumerGroup = Constants.ConsumerGroupID)] string kafkaEvent, + ILogger log) + { + log.LogInformation(kafkaEvent); + } + } + + internal static class RegexTopic_SingleItem_Raw_String_Without_Key_Trigger + { + public static void Trigger( + [KafkaTrigger("LocalBroker", "^(" + Constants.StringTopicWithTenPartitionsName + "|" + Constants.StringTopicWithOnePartitionName + ")", ConsumerGroup = Constants.ConsumerGroupID)] string kafkaEvent, + ILogger log) + { + log.LogInformation(kafkaEvent); + } + } + internal static class MultiItem_Raw_StringArray_Without_Key_Trigger { public static void Trigger( diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaListenerTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaListenerTest.cs index 804f47d7..3b8eb409 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaListenerTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaListenerTest.cs @@ -70,7 +70,7 @@ public async Task When_Using_SingleItem_Binding_10_Events_Should_Execute_Functio var listenerConfig = new KafkaListenerConfiguration() { BrokerList = "testBroker", - Topic = "topic", + Topics = { "topic" }, ConsumerGroup = "group1", }; @@ -133,7 +133,7 @@ public async Task When_Using_MultiItem_Binding_10_Events_Should_Execute_Function var listenerConfig = new KafkaListenerConfiguration() { BrokerList = "testBroker", - Topic = "topic", + Topics = { "topic" }, ConsumerGroup = "group1", }; @@ -255,7 +255,7 @@ public async Task When_Topic_Has_Multiple_Partitions_Should_Execute_And_Commit_I var listenerConfig = new KafkaListenerConfiguration() { BrokerList = "testBroker", - Topic = "topic", + Topics = { "topic" }, ConsumerGroup = "group1", }; @@ -318,7 +318,7 @@ public async Task When_Options_Are_Set_Should_Be_Set_In_Consumer_Config() var listenerConfig = new KafkaListenerConfiguration() { BrokerList = "testBroker", - Topic = "topic", + Topics = { "topic" }, ConsumerGroup = "group1", SslKeyPassword = "password1", SslCertificateLocation = "path/to/cert", @@ -368,7 +368,7 @@ public async Task When_Options_With_Ssal_Are_Set_Should_Be_Set_In_Consumer_Confi var listenerConfig = new KafkaListenerConfiguration() { BrokerList = "testBroker", - Topic = "topic", + Topics = { "topic" }, ConsumerGroup = "group1", SaslMechanism = SaslMechanism.Plain, SaslPassword = "mypassword", @@ -416,7 +416,7 @@ public async Task When_Using_Invalid_Eventhubs_Certificate_File_Should_Fail() var listenerConfig = new KafkaListenerConfiguration() { BrokerList = "testBroker", - Topic = "topic", + Topics = { "topic" }, EventHubConnectionString = "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=reader;SharedAccessKey=fake", ConsumerGroup = "group1", SslCaLocation = "does-not-exists.pem", @@ -450,7 +450,7 @@ public async Task When_Using_Default_Eventhubs_Certificate_File_Should_Contain_F var listenerConfig = new KafkaListenerConfiguration() { BrokerList = broker, - Topic = "topic", + Topics = { "topic" }, EventHubConnectionString = eventhubsConnectionString, ConsumerGroup = "group1", }; @@ -556,7 +556,7 @@ public async Task When_Using_Single_Dispatcher_Slow_Partition_Processing_Should_ var listenerConfig = new KafkaListenerConfiguration() { BrokerList = "testBroker", - Topic = "topic", + Topics = { "topic" }, ConsumerGroup = "group1", }; diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerForTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerForTest.cs index 17ba4fc7..d76d2b99 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerForTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerForTest.cs @@ -12,8 +12,8 @@ public class KafkaTopicScalerForTest : KafkaTopicScaler partitions; - public KafkaTopicScalerForTest(string topic, string consumerGroup, string functionId, IConsumer consumer, AdminClientConfig adminClientConfig, ILogger logger) - : base(topic, consumerGroup, functionId, consumer, adminClientConfig, logger) + public KafkaTopicScalerForTest(IReadOnlyList topics, string consumerGroup, string functionId, IConsumer consumer, AdminClientBuilder adminClientBuilder, ILogger logger) + : base(topics, consumerGroup, functionId, consumer, adminClientBuilder, logger) { } diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs index 04d67dda..6cb96876 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTopicScalerTest.cs @@ -7,6 +7,7 @@ using Moq; using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using Xunit; @@ -15,7 +16,8 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests public class KafkaTopicScalerTest { - const string TopicName = "topicTest"; + private readonly string[] actualTopicNames = new[] { "topicTest", "topicTest2" }; + private readonly string[] configuredTopicNames = new[] { "topicTest", "^.+Test2" }; private readonly TopicPartition partition0; private readonly TopicPartition partition1; @@ -23,7 +25,9 @@ public class KafkaTopicScalerTest private readonly TopicPartition partition3; private readonly List partitions; private readonly KafkaTopicScalerForTest topicScaler; + private readonly Mock adminClient; private readonly Mock> consumer; + private readonly Mock adminClientBuilder; private Offset ZeroOffset => new Offset(0L); private TimeSpan AnyTimeSpan => It.IsAny(); @@ -32,11 +36,12 @@ public class KafkaTopicScalerTest public KafkaTopicScalerTest() { consumer = new Mock>(); + adminClientBuilder = new Mock(new AdminClientConfig()); - partition0 = new TopicPartition(TopicName, new Partition(0)); - partition1 = new TopicPartition(TopicName, new Partition(1)); - partition2 = new TopicPartition(TopicName, new Partition(2)); - partition3 = new TopicPartition(TopicName, new Partition(3)); + partition0 = new TopicPartition(actualTopicNames[0], new Partition(0)); + partition1 = new TopicPartition(actualTopicNames[0], new Partition(1)); + partition2 = new TopicPartition(actualTopicNames[1], new Partition(2)); + partition3 = new TopicPartition(actualTopicNames[1], new Partition(3)); partitions = new List { @@ -47,25 +52,49 @@ public KafkaTopicScalerTest() }; topicScaler = new KafkaTopicScalerForTest( - TopicName, + configuredTopicNames, "consumer-group-test", "testfunction", - consumer.Object, new AdminClientConfig(), + consumer.Object, adminClientBuilder.Object, NullLogger.Instance); - topicScaler.WithPartitions(partitions); + adminClient = new Mock(); + + adminClientBuilder.Setup(x => x.Build()).Returns(adminClient.Object); + + adminClient.Setup(x => x.GetMetadata(It.IsAny())).Returns(new Metadata(null, partitions.GroupBy(p=>p.Topic).Select(x=> + new TopicMetadata( + x.Key, + new List(x.Select(t=>new PartitionMetadata(t.Partition.Value, 0, null, null, null))), + null + ) + ).ToList(), 0, "")); + + adminClient.Setup(x => x.GetMetadata(It.IsAny(), It.IsAny())).Returns((string topic, TimeSpan timeOut)=> new Metadata(null, partitions.Where(p => p.Topic == topic).GroupBy(p => p.Topic).Select(x => + new TopicMetadata( + x.Key, + new List(x.Select(t => new PartitionMetadata(t.Partition.Value, 0, null, null, null))), + null + ) + ).ToList(), 0, "")); } [Fact] public void ScaleMonitor_Id_ReturnsExpectedValue() { - Assert.Equal("testfunction-kafkatrigger-topictest-consumer-group-test", topicScaler.Descriptor.Id); + Assert.Equal("testfunction-kafkatrigger-topictest-^.+test2-consumer-group-test", topicScaler.Descriptor.Id); } - [Fact] - public async Task When_Offset_Is_Zero_Should_Return_No_Lag() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task When_Offset_Is_Zero_Should_Return_No_Lag(bool mockAdminClient) { + if (!mockAdminClient) + { + topicScaler.WithPartitions(partitions); + } consumer.Setup(x => x.Committed(It.IsNotNull>(), AnyTimeSpan)) .Returns(new List { @@ -85,9 +114,15 @@ public async Task When_Offset_Is_Zero_Should_Return_No_Lag() } - [Fact] - public async Task When_Committed_Is_Behind_Offset_Should_Return_Combined_Lag() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task When_Committed_Is_Behind_Offset_Should_Return_Combined_Lag(bool mockAdminClient) { + if (!mockAdminClient) + { + topicScaler.WithPartitions(partitions); + } const long currentOffset = 100; const long largestLagOffset = currentOffset - 50; const long minimalLagOffset = currentOffset - 1; diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerAttributeBindingProviderTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerAttributeBindingProviderTest.cs index 76bfe671..961422e7 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerAttributeBindingProviderTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerAttributeBindingProviderTest.cs @@ -105,7 +105,7 @@ public async Task When_No_Type_Is_Set_Should_Create_ByteArray_Listener(string fu var listenerConfig = new KafkaListenerConfiguration() { BrokerList = attribute.BrokerList, - Topic = attribute.Topic, + Topics = attribute.Topics, ConsumerGroup = "group1", }; @@ -141,7 +141,7 @@ public async Task When_String_Value_Type_Is_Set_Should_Create_String_Listener(st var listenerConfig = new KafkaListenerConfiguration() { BrokerList = attribute.BrokerList, - Topic = attribute.Topic, + Topics = attribute.Topics, ConsumerGroup = "group1", }; @@ -178,7 +178,7 @@ public async Task When_Avro_Schema_Is_Provided_Should_Create_GenericRecord_Liste var listenerConfig = new KafkaListenerConfiguration() { BrokerList = attribute.BrokerList, - Topic = attribute.Topic, + Topics = attribute.Topics, ConsumerGroup = "group1", }; @@ -215,7 +215,7 @@ public async Task When_Value_Type_Is_Specific_Record_Should_Create_SpecificRecor var listenerConfig = new KafkaListenerConfiguration() { BrokerList = attribute.BrokerList, - Topic = attribute.Topic, + Topics = attribute.Topics, ConsumerGroup = "group1", }; @@ -252,7 +252,7 @@ public async Task When_Value_Type_Is_Protobuf_Should_Create_Protobuf_Listener(st var listenerConfig = new KafkaListenerConfiguration() { BrokerList = attribute.BrokerList, - Topic = attribute.Topic, + Topics = attribute.Topics, ConsumerGroup = "group1", }; @@ -301,7 +301,7 @@ public async Task When_Value_Is_KafkaEventData_With_Key_Should_Create_Listener_W var listenerConfig = new KafkaListenerConfiguration() { BrokerList = attribute.BrokerList, - Topic = attribute.Topic, + Topics = attribute.Topics, ConsumerGroup = "group1", };