diff --git a/src/KafkaFlow.IntegrationTests/ConsumerTest.cs b/src/KafkaFlow.IntegrationTests/ConsumerTest.cs index e2370565d..c762ff527 100644 --- a/src/KafkaFlow.IntegrationTests/ConsumerTest.cs +++ b/src/KafkaFlow.IntegrationTests/ConsumerTest.cs @@ -6,6 +6,7 @@ namespace KafkaFlow.IntegrationTests using AutoFixture; using global::Microsoft.Extensions.DependencyInjection; using global::Microsoft.VisualStudio.TestTools.UnitTesting; + using KafkaFlow.Consumers; using KafkaFlow.IntegrationTests.Core; using KafkaFlow.IntegrationTests.Core.Handlers; using KafkaFlow.IntegrationTests.Core.Messages; @@ -141,5 +142,21 @@ await Task.WhenAll( await MessageStorage.AssertMessageAsync(message); } } + + [TestMethod] + public void AddConsumer_WithSharedConsumerConfig_ConsumersAreConfiguratedIndependently() + { + // Act + var consumers = this.provider.GetRequiredService().All; + + // Assert + Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.AvroGroupId))); + Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.GzipGroupId))); + Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.JsonGroupId))); + Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.JsonGzipGroupId))); + Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.PauseResumeGroupId))); + Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGroupId))); + Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGzipGroupId))); + } } } diff --git a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs index 99dccf8c5..293d8c60b 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs @@ -25,6 +25,14 @@ internal static class Bootstrapper public const string PauseResumeTopicName = "test-pause-resume"; public const int MaxPollIntervalMs = 7000; + internal const string ProtobufGroupId = "consumer-protobuf"; + internal const string GzipGroupId = "consumer-gzip"; + internal const string JsonGzipGroupId = "consumer-json-gzip"; + internal const string ProtobufGzipGroupId = "consumer-protobuf-gzip"; + internal const string PauseResumeGroupId = "consumer-pause-resume"; + internal const string AvroGroupId = "consumer-avro"; + internal const string JsonGroupId = "consumer-json"; + private const string ProtobufTopicName = "test-protobuf"; private const string ProtobufSchemaRegistryTopicName = "test-protobuf-sr"; private const string JsonSchemaRegistryTopicName = "test-json-sr"; @@ -35,12 +43,6 @@ internal static class Bootstrapper private const string ProtobufGzipTopicName2 = "test-protobuf-gzip-2"; private const string AvroTopicName = "test-avro"; - private const string ProtobufGroupId = "consumer-protobuf"; - private const string GzipGroupId = "consumer-gzip"; - private const string JsonGzipGroupId = "consumer-json-gzip"; - private const string ProtobufGzipGroupId = "consumer-protobuf-gzip"; - private const string PauseResumeGroupId = "consumer-pause-resume"; - private static readonly Lazy LazyProvider = new(SetupProvider); public static IServiceProvider GetServiceProvider() => LazyProvider.Value; @@ -83,6 +85,17 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection var kafkaBrokers = context.Configuration.GetValue("Kafka:Brokers"); var schemaRegistryUrl = context.Configuration.GetValue("SchemaRegistry:Url"); + ConsumerConfig defaultConfig = new() + { + Acks = Confluent.Kafka.Acks.All, + AllowAutoCreateTopics = false, + AutoCommitIntervalMs = 5000, + AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Latest, + LogConnectionClose = false, + ReconnectBackoffMs = 1000, + ReconnectBackoffMaxMs = 6000 + }; + services.AddKafka( kafka => kafka .UseLogHandler() @@ -109,10 +122,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .AddConsumer( consumer => consumer .Topic(AvroTopicName) - .WithGroupId("consumer-avro") + .WithGroupId(AvroGroupId) .WithBufferSize(100) .WithWorkersCount(10) .WithAutoOffsetReset(AutoOffsetReset.Latest) + .WithConsumerConfig(defaultConfig) .AddMiddlewares( middlewares => middlewares .AddSerializer() @@ -136,10 +150,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .AddConsumer( consumer => consumer .Topic(ProtobufSchemaRegistryTopicName) - .WithGroupId("consumer-protobuf") + .WithGroupId(ProtobufGroupId) .WithBufferSize(100) .WithWorkersCount(10) .WithAutoOffsetReset(AutoOffsetReset.Latest) + .WithConsumerConfig(defaultConfig) .AddMiddlewares( middlewares => middlewares .AddSerializer() @@ -163,10 +178,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .AddConsumer( consumer => consumer .Topic(JsonSchemaRegistryTopicName) - .WithGroupId("consumer-json") + .WithGroupId(JsonGroupId) .WithBufferSize(100) .WithWorkersCount(10) .WithAutoOffsetReset(AutoOffsetReset.Latest) + .WithConsumerConfig(defaultConfig) .AddMiddlewares( middlewares => middlewares .AddSerializer() diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index db0537540..66f41d4ce 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -234,19 +234,22 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) var middlewareConfiguration = this.middlewareConfigurationBuilder.Build(); this.consumerConfig ??= new ConsumerConfig(); - this.consumerConfig.BootstrapServers ??= string.Join(",", clusterConfiguration.Brokers); - this.consumerConfig.GroupId ??= this.groupId; - this.consumerConfig.AutoOffsetReset ??= this.autoOffsetReset; - this.consumerConfig.MaxPollIntervalMs ??= this.maxPollIntervalMs; - this.consumerConfig.StatisticsIntervalMs ??= this.statisticsInterval; - this.consumerConfig.EnableAutoOffsetStore = false; - this.consumerConfig.EnableAutoCommit = false; + var consumerConfigCopy = new KafkaFlowConsumerConfig(this.consumerConfig); - this.consumerConfig.ReadSecurityInformationFrom(clusterConfiguration); + consumerConfigCopy.BootstrapServers = this.consumerConfig?.BootstrapServers ?? string.Join(",", clusterConfiguration.Brokers); + consumerConfigCopy.GroupId = this.consumerConfig?.GroupId ?? this.groupId; + consumerConfigCopy.AutoOffsetReset = this.consumerConfig?.AutoOffsetReset ?? this.autoOffsetReset; + consumerConfigCopy.MaxPollIntervalMs = this.consumerConfig?.MaxPollIntervalMs ?? this.maxPollIntervalMs; + consumerConfigCopy.StatisticsIntervalMs = this.consumerConfig?.StatisticsIntervalMs ?? this.statisticsInterval; + + consumerConfigCopy.EnableAutoOffsetStore = false; + consumerConfigCopy.EnableAutoCommit = false; + + consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration); return new ConsumerConfiguration( - this.consumerConfig, + consumerConfigCopy, this.topics, this.topicsPartitions, this.name, diff --git a/src/KafkaFlow/Configuration/KafkaFlowConsumerConfig.cs b/src/KafkaFlow/Configuration/KafkaFlowConsumerConfig.cs new file mode 100644 index 000000000..d11d648d4 --- /dev/null +++ b/src/KafkaFlow/Configuration/KafkaFlowConsumerConfig.cs @@ -0,0 +1,14 @@ +namespace KafkaFlow.Configuration +{ + using System.Collections.Generic; + using Confluent.Kafka; + + internal class KafkaFlowConsumerConfig : ConsumerConfig + { + public KafkaFlowConsumerConfig(ConsumerConfig consumerConfig) + :base(consumerConfig) + { + this.properties = new Dictionary(this.properties); + } + } +}