diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaExtensionConfigProvider.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaExtensionConfigProvider.cs index 87335678..7a71751d 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaExtensionConfigProvider.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaExtensionConfigProvider.cs @@ -29,6 +29,7 @@ public class KafkaExtensionConfigProvider : IExtensionConfigProvider private readonly INameResolver nameResolver; private readonly IWebJobsExtensionConfiguration configuration; private readonly IKafkaProducerFactory kafkaProducerFactory; + private readonly IKafkaTopicScalerFactory kafkaTopicScalerFactory; private readonly ILogger logger; public KafkaExtensionConfigProvider( @@ -38,7 +39,9 @@ public KafkaExtensionConfigProvider( IConverterManager converterManager, INameResolver nameResolver, IWebJobsExtensionConfiguration configuration, - IKafkaProducerFactory kafkaProducerFactory) + IKafkaProducerFactory kafkaProducerFactory, + IKafkaTopicScalerFactory kafkaTopicScalerFactory + ) { this.config = config; this.options = options; @@ -47,6 +50,7 @@ public KafkaExtensionConfigProvider( this.nameResolver = nameResolver; this.configuration = configuration; this.kafkaProducerFactory = kafkaProducerFactory; + this.kafkaTopicScalerFactory = kafkaTopicScalerFactory; this.logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Kafka")); } @@ -55,7 +59,7 @@ public void Initialize(ExtensionConfigContext context) configuration.ConfigurationSection.Bind(options); // register our trigger binding provider - var triggerBindingProvider = new KafkaTriggerAttributeBindingProvider(config, options, converterManager, nameResolver, loggerFactory); + var triggerBindingProvider = new KafkaTriggerAttributeBindingProvider(config, options, converterManager, nameResolver, kafkaTopicScalerFactory, loggerFactory); context.AddBindingRule() .BindToTrigger(triggerBindingProvider); diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaWebJobsBuilderExtensions.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaWebJobsBuilderExtensions.cs index ced2072d..fc058812 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaWebJobsBuilderExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaWebJobsBuilderExtensions.cs @@ -37,6 +37,7 @@ public static IWebJobsBuilder AddKafka(this IWebJobsBuilder builder, Action(); + builder.Services.AddSingleton(); return builder; } diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/DefaultKafkaTopicScalerFactory.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/DefaultKafkaTopicScalerFactory.cs new file mode 100644 index 00000000..c22d67a2 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/DefaultKafkaTopicScalerFactory.cs @@ -0,0 +1,16 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Confluent.Kafka; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.WebJobs.Extensions.Kafka +{ + public class DefaultKafkaTopicScalerFactory : IKafkaTopicScalerFactory + { + public KafkaTopicScaler CreateKafkaTopicScaler(string topic, string consumerGroup, string functionId, IConsumer consumer, AdminClientConfig adminClientConfig, ILogger logger) + { + return new KafkaTopicScaler(topic, consumerGroup, functionId, consumer, adminClientConfig, logger); + } + } +} \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/IKafkaTopicScalerFactory.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/IKafkaTopicScalerFactory.cs new file mode 100644 index 00000000..503fc66b --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/IKafkaTopicScalerFactory.cs @@ -0,0 +1,13 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Confluent.Kafka; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.WebJobs.Extensions.Kafka +{ + public interface IKafkaTopicScalerFactory + { + KafkaTopicScaler CreateKafkaTopicScaler(string topic, string consumerGroup, string functionId, IConsumer consumer, AdminClientConfig adminClientConfig, ILogger logger); + } +} \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs index 3511fd36..8d045047 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs @@ -37,6 +37,7 @@ internal class KafkaListener : IListener, IScaleMonitorProvider private readonly KafkaListenerConfiguration listenerConfiguration; // Indicates if the consumer requires the Kafka element key private readonly bool requiresKey; + private readonly IKafkaTopicScalerFactory kafkaTopicScalerFactory; private readonly ILogger logger; private FunctionExecutorBase functionExecutor; private Lazy> consumer; @@ -62,10 +63,12 @@ public KafkaListener( KafkaListenerConfiguration kafkaListenerConfiguration, bool requiresKey, IDeserializer valueDeserializer, + IKafkaTopicScalerFactory kafkaTopicScalerFactory, ILogger logger, string functionId) { this.ValueDeserializer = valueDeserializer; + this.kafkaTopicScalerFactory = kafkaTopicScalerFactory; this.executor = executor; this.singleDispatch = singleDispatch; this.options = options; @@ -114,7 +117,7 @@ private IConsumer CreateConsumer() private KafkaTopicScaler CreateTopicScaler() { - return new KafkaTopicScaler(this.listenerConfiguration.Topic, this.consumerGroup, this.functionId, this.consumer.Value, new AdminClientConfig(GetConsumerConfiguration()), this.logger); + return kafkaTopicScalerFactory.CreateKafkaTopicScaler(this.listenerConfiguration.Topic, this.consumerGroup, this.functionId, this.consumer.Value, new AdminClientConfig(GetConsumerConfiguration()), this.logger); } public void Cancel() diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs index 84887df3..925acaef 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaTopicScaler.cs @@ -134,7 +134,7 @@ public ScaleStatus GetScaleStatus(ScaleStatusContext context) return GetScaleStatusCore(context.WorkerCount, context.Metrics?.OfType().ToArray()); } - public ScaleStatus GetScaleStatus(ScaleStatusContext context) + public virtual ScaleStatus GetScaleStatus(ScaleStatusContext context) { return GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray()); } diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs index 22218234..7aacfccb 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs @@ -21,6 +21,7 @@ internal class KafkaTriggerAttributeBindingProvider : ITriggerBindingProvider private readonly IConfiguration config; private readonly IConverterManager converterManager; private readonly INameResolver nameResolver; + private readonly IKafkaTopicScalerFactory kafkaTopicScalerFactory; private readonly IOptions options; private readonly ILogger logger; @@ -29,11 +30,13 @@ public KafkaTriggerAttributeBindingProvider( IOptions options, IConverterManager converterManager, INameResolver nameResolver, + IKafkaTopicScalerFactory kafkaTopicScalerFactory, ILoggerFactory loggerFactory) { this.config = config; this.converterManager = converterManager; this.nameResolver = nameResolver; + this.kafkaTopicScalerFactory = kafkaTopicScalerFactory; this.options = options; this.logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Kafka")); } @@ -75,6 +78,7 @@ Task listenerCreator(ListenerFactoryContext factoryContext, bool sing listenerConfiguration, requiresKey, valueDeserializer, + kafkaTopicScalerFactory, this.logger, factoryContext.Descriptor.Id); diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/Helpers/KafkaListenerForTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/Helpers/KafkaListenerForTest.cs index 3d3afb5d..6ce52bfa 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/Helpers/KafkaListenerForTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/Helpers/KafkaListenerForTest.cs @@ -34,6 +34,7 @@ public KafkaListenerForTest(ITriggeredFunctionExecutor executor, kafkaListenerConfiguration, requiresKey, valueDeserializer, + new DefaultKafkaTopicScalerFactory(), logger, functionId) { diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerAttributeBindingProviderTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerAttributeBindingProviderTest.cs index 76bfe671..43f5ff71 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerAttributeBindingProviderTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerAttributeBindingProviderTest.cs @@ -115,7 +115,8 @@ public async Task When_No_Type_Is_Set_Should_Create_ByteArray_Listener(string fu config, Options.Create(new KafkaOptions()), new KafkaEventDataConvertManager(NullLogger.Instance), - new DefaultNameResolver(config), + new DefaultNameResolver(config), + new DefaultKafkaTopicScalerFactory(), NullLoggerFactory.Instance); var parameterInfo = new TriggerBindingProviderContext(this.GetParameterInfo(functionName), default); @@ -152,6 +153,7 @@ public async Task When_String_Value_Type_Is_Set_Should_Create_String_Listener(st Options.Create(new KafkaOptions()), new KafkaEventDataConvertManager(NullLogger.Instance), new DefaultNameResolver(config), + new DefaultKafkaTopicScalerFactory(), NullLoggerFactory.Instance); var parameterInfo = new TriggerBindingProviderContext(this.GetParameterInfo(functionName), default); @@ -189,6 +191,7 @@ public async Task When_Avro_Schema_Is_Provided_Should_Create_GenericRecord_Liste Options.Create(new KafkaOptions()), new KafkaEventDataConvertManager(NullLogger.Instance), new DefaultNameResolver(config), + new DefaultKafkaTopicScalerFactory(), NullLoggerFactory.Instance); var parameterInfo = new TriggerBindingProviderContext(this.GetParameterInfo(functionName), default); @@ -226,6 +229,7 @@ public async Task When_Value_Type_Is_Specific_Record_Should_Create_SpecificRecor Options.Create(new KafkaOptions()), new KafkaEventDataConvertManager(NullLogger.Instance), new DefaultNameResolver(config), + new DefaultKafkaTopicScalerFactory(), NullLoggerFactory.Instance); var parameterInfo = new TriggerBindingProviderContext(this.GetParameterInfo(functionName), default); @@ -263,6 +267,7 @@ public async Task When_Value_Type_Is_Protobuf_Should_Create_Protobuf_Listener(st Options.Create(new KafkaOptions()), new KafkaEventDataConvertManager(NullLogger.Instance), new DefaultNameResolver(config), + new DefaultKafkaTopicScalerFactory(), NullLoggerFactory.Instance); var parameterInfo = new TriggerBindingProviderContext(this.GetParameterInfo(functionName), default); @@ -312,6 +317,7 @@ public async Task When_Value_Is_KafkaEventData_With_Key_Should_Create_Listener_W Options.Create(new KafkaOptions()), new KafkaEventDataConvertManager(NullLogger.Instance), new DefaultNameResolver(config), + new DefaultKafkaTopicScalerFactory(), NullLoggerFactory.Instance); var parameterInfo = new TriggerBindingProviderContext(this.GetParameterInfo(functionName), default);