From 7ed5ab8011f7f341025e43f5bce5207390a6d6cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Mon, 4 Sep 2023 14:16:38 +0100 Subject: [PATCH] fix: create consumerconfig independently in builder --- .../ConsumerTest.cs | 17 ++++++++++ .../Core/Bootstrapper.cs | 34 ++++++++++++++----- .../ConsumerConfigurationBuilderTests.cs | 7 ++-- .../ConsumerConfigurationBuilder.cs | 21 +++++++----- 4 files changed, 59 insertions(+), 20 deletions(-) diff --git a/src/KafkaFlow.IntegrationTests/ConsumerTest.cs b/src/KafkaFlow.IntegrationTests/ConsumerTest.cs index e2370565d..c762ff527 100644 --- a/src/KafkaFlow.IntegrationTests/ConsumerTest.cs +++ b/src/KafkaFlow.IntegrationTests/ConsumerTest.cs @@ -6,6 +6,7 @@ namespace KafkaFlow.IntegrationTests using AutoFixture; using global::Microsoft.Extensions.DependencyInjection; using global::Microsoft.VisualStudio.TestTools.UnitTesting; + using KafkaFlow.Consumers; using KafkaFlow.IntegrationTests.Core; using KafkaFlow.IntegrationTests.Core.Handlers; using KafkaFlow.IntegrationTests.Core.Messages; @@ -141,5 +142,21 @@ await Task.WhenAll( await MessageStorage.AssertMessageAsync(message); } } + + [TestMethod] + public void AddConsumer_WithSharedConsumerConfig_ConsumersAreConfiguratedIndependently() + { + // Act + var consumers = this.provider.GetRequiredService().All; + + // Assert + Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.AvroGroupId))); + Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.GzipGroupId))); + Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.JsonGroupId))); + Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.JsonGzipGroupId))); + Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.PauseResumeGroupId))); + Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGroupId))); + Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGzipGroupId))); + } } } diff --git a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs index 99dccf8c5..293d8c60b 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs @@ -25,6 +25,14 @@ internal static class Bootstrapper public const string PauseResumeTopicName = "test-pause-resume"; public const int MaxPollIntervalMs = 7000; + internal const string ProtobufGroupId = "consumer-protobuf"; + internal const string GzipGroupId = "consumer-gzip"; + internal const string JsonGzipGroupId = "consumer-json-gzip"; + internal const string ProtobufGzipGroupId = "consumer-protobuf-gzip"; + internal const string PauseResumeGroupId = "consumer-pause-resume"; + internal const string AvroGroupId = "consumer-avro"; + internal const string JsonGroupId = "consumer-json"; + private const string ProtobufTopicName = "test-protobuf"; private const string ProtobufSchemaRegistryTopicName = "test-protobuf-sr"; private const string JsonSchemaRegistryTopicName = "test-json-sr"; @@ -35,12 +43,6 @@ internal static class Bootstrapper private const string ProtobufGzipTopicName2 = "test-protobuf-gzip-2"; private const string AvroTopicName = "test-avro"; - private const string ProtobufGroupId = "consumer-protobuf"; - private const string GzipGroupId = "consumer-gzip"; - private const string JsonGzipGroupId = "consumer-json-gzip"; - private const string ProtobufGzipGroupId = "consumer-protobuf-gzip"; - private const string PauseResumeGroupId = "consumer-pause-resume"; - private static readonly Lazy LazyProvider = new(SetupProvider); public static IServiceProvider GetServiceProvider() => LazyProvider.Value; @@ -83,6 +85,17 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection var kafkaBrokers = context.Configuration.GetValue("Kafka:Brokers"); var schemaRegistryUrl = context.Configuration.GetValue("SchemaRegistry:Url"); + ConsumerConfig defaultConfig = new() + { + Acks = Confluent.Kafka.Acks.All, + AllowAutoCreateTopics = false, + AutoCommitIntervalMs = 5000, + AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Latest, + LogConnectionClose = false, + ReconnectBackoffMs = 1000, + ReconnectBackoffMaxMs = 6000 + }; + services.AddKafka( kafka => kafka .UseLogHandler() @@ -109,10 +122,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .AddConsumer( consumer => consumer .Topic(AvroTopicName) - .WithGroupId("consumer-avro") + .WithGroupId(AvroGroupId) .WithBufferSize(100) .WithWorkersCount(10) .WithAutoOffsetReset(AutoOffsetReset.Latest) + .WithConsumerConfig(defaultConfig) .AddMiddlewares( middlewares => middlewares .AddSerializer() @@ -136,10 +150,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .AddConsumer( consumer => consumer .Topic(ProtobufSchemaRegistryTopicName) - .WithGroupId("consumer-protobuf") + .WithGroupId(ProtobufGroupId) .WithBufferSize(100) .WithWorkersCount(10) .WithAutoOffsetReset(AutoOffsetReset.Latest) + .WithConsumerConfig(defaultConfig) .AddMiddlewares( middlewares => middlewares .AddSerializer() @@ -163,10 +178,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .AddConsumer( consumer => consumer .Topic(JsonSchemaRegistryTopicName) - .WithGroupId("consumer-json") + .WithGroupId(JsonGroupId) .WithBufferSize(100) .WithWorkersCount(10) .WithAutoOffsetReset(AutoOffsetReset.Latest) + .WithConsumerConfig(defaultConfig) .AddMiddlewares( middlewares => middlewares .AddSerializer() diff --git a/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs b/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs index 3d3c16a21..cd15608ae 100644 --- a/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs +++ b/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs @@ -88,7 +88,10 @@ public void Build_AllCalls_ReturnPassedValues() Action> partitionsAssignedHandler = (_, _) => { }; Action> partitionsRevokedHandler = (_, _) => { }; const int statisticsIntervalMs = 100; - var consumerConfig = new ConsumerConfig(); + var consumerConfig = new ConsumerConfig + { + ClientId = "testeclient" + }; this.target .Topics(topic1) @@ -128,7 +131,7 @@ public void Build_AllCalls_ReturnPassedValues() configuration.StatisticsHandlers.Should().HaveElementAt(0, statisticsHandler); configuration.PartitionsAssignedHandlers.Should().HaveElementAt(0, partitionsAssignedHandler); configuration.PartitionsRevokedHandlers.Should().HaveElementAt(0, partitionsRevokedHandler); - configuration.GetKafkaConfig().Should().BeSameAs(consumerConfig); + configuration.GetKafkaConfig().ClientId.Should().Be(consumerConfig.ClientId); configuration.MiddlewaresConfigurations.Should().HaveCount(1); } } diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index db0537540..f7e601ed0 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -234,19 +234,22 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) var middlewareConfiguration = this.middlewareConfigurationBuilder.Build(); this.consumerConfig ??= new ConsumerConfig(); - this.consumerConfig.BootstrapServers ??= string.Join(",", clusterConfiguration.Brokers); - this.consumerConfig.GroupId ??= this.groupId; - this.consumerConfig.AutoOffsetReset ??= this.autoOffsetReset; - this.consumerConfig.MaxPollIntervalMs ??= this.maxPollIntervalMs; - this.consumerConfig.StatisticsIntervalMs ??= this.statisticsInterval; - this.consumerConfig.EnableAutoOffsetStore = false; - this.consumerConfig.EnableAutoCommit = false; + var consumerConfigCopy = new ConsumerConfig(this.consumerConfig.ToDictionary(x => x.Key, x => x.Value)); - this.consumerConfig.ReadSecurityInformationFrom(clusterConfiguration); + consumerConfigCopy.BootstrapServers = this.consumerConfig.BootstrapServers ?? string.Join(",", clusterConfiguration.Brokers); + consumerConfigCopy.GroupId = this.consumerConfig.GroupId ?? this.groupId; + consumerConfigCopy.AutoOffsetReset = this.consumerConfig.AutoOffsetReset ?? this.autoOffsetReset; + consumerConfigCopy.MaxPollIntervalMs = this.consumerConfig.MaxPollIntervalMs ?? this.maxPollIntervalMs; + consumerConfigCopy.StatisticsIntervalMs = this.consumerConfig.StatisticsIntervalMs ?? this.statisticsInterval; + + consumerConfigCopy.EnableAutoOffsetStore = false; + consumerConfigCopy.EnableAutoCommit = false; + + consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration); return new ConsumerConfiguration( - this.consumerConfig, + consumerConfigCopy, this.topics, this.topicsPartitions, this.name,