Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug Report]: Kafkaflow does not allow to have consumers with different consumer groups #424

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/KafkaFlow.IntegrationTests/ConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace KafkaFlow.IntegrationTests
using AutoFixture;
using global::Microsoft.Extensions.DependencyInjection;
using global::Microsoft.VisualStudio.TestTools.UnitTesting;
using KafkaFlow.Consumers;
using KafkaFlow.IntegrationTests.Core;
using KafkaFlow.IntegrationTests.Core.Handlers;
using KafkaFlow.IntegrationTests.Core.Messages;
Expand Down Expand Up @@ -141,5 +142,21 @@ await Task.WhenAll(
await MessageStorage.AssertMessageAsync(message);
}
}

[TestMethod]
public void AddConsumer_WithSharedConsumerConfig_ConsumersAreConfiguratedIndependently()
{
// Act
var consumers = this.provider.GetRequiredService<IConsumerAccessor>().All;

// Assert
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.AvroGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.GzipGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.JsonGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.JsonGzipGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.PauseResumeGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGzipGroupId)));
}
}
}
34 changes: 25 additions & 9 deletions src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ internal static class Bootstrapper
public const string PauseResumeTopicName = "test-pause-resume";
public const int MaxPollIntervalMs = 7000;

internal const string ProtobufGroupId = "consumer-protobuf";
internal const string GzipGroupId = "consumer-gzip";
internal const string JsonGzipGroupId = "consumer-json-gzip";
internal const string ProtobufGzipGroupId = "consumer-protobuf-gzip";
internal const string PauseResumeGroupId = "consumer-pause-resume";
internal const string AvroGroupId = "consumer-avro";
internal const string JsonGroupId = "consumer-json";

private const string ProtobufTopicName = "test-protobuf";
private const string ProtobufSchemaRegistryTopicName = "test-protobuf-sr";
private const string JsonSchemaRegistryTopicName = "test-json-sr";
Expand All @@ -35,12 +43,6 @@ internal static class Bootstrapper
private const string ProtobufGzipTopicName2 = "test-protobuf-gzip-2";
private const string AvroTopicName = "test-avro";

private const string ProtobufGroupId = "consumer-protobuf";
private const string GzipGroupId = "consumer-gzip";
private const string JsonGzipGroupId = "consumer-json-gzip";
private const string ProtobufGzipGroupId = "consumer-protobuf-gzip";
private const string PauseResumeGroupId = "consumer-pause-resume";

private static readonly Lazy<IServiceProvider> LazyProvider = new(SetupProvider);

public static IServiceProvider GetServiceProvider() => LazyProvider.Value;
Expand Down Expand Up @@ -83,6 +85,17 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
var kafkaBrokers = context.Configuration.GetValue<string>("Kafka:Brokers");
var schemaRegistryUrl = context.Configuration.GetValue<string>("SchemaRegistry:Url");

ConsumerConfig defaultConfig = new()
{
Acks = Confluent.Kafka.Acks.All,
AllowAutoCreateTopics = false,
AutoCommitIntervalMs = 5000,
AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Latest,
LogConnectionClose = false,
ReconnectBackoffMs = 1000,
ReconnectBackoffMaxMs = 6000
};

services.AddKafka(
kafka => kafka
.UseLogHandler<TraceLogHandler>()
Expand All @@ -109,10 +122,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddConsumer(
consumer => consumer
.Topic(AvroTopicName)
.WithGroupId("consumer-avro")
.WithGroupId(AvroGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.WithConsumerConfig(defaultConfig)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ConfluentAvroSerializer>()
Expand All @@ -136,10 +150,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddConsumer(
consumer => consumer
.Topic(ProtobufSchemaRegistryTopicName)
.WithGroupId("consumer-protobuf")
.WithGroupId(ProtobufGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.WithConsumerConfig(defaultConfig)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ConfluentProtobufSerializer>()
Expand All @@ -163,10 +178,11 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddConsumer(
consumer => consumer
.Topic(JsonSchemaRegistryTopicName)
.WithGroupId("consumer-json")
.WithGroupId(JsonGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.WithConsumerConfig(defaultConfig)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ConfluentJsonSerializer>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ public void Build_AllCalls_ReturnPassedValues()
Action<IDependencyResolver, List<TopicPartition>> partitionsAssignedHandler = (_, _) => { };
Action<IDependencyResolver, List<TopicPartitionOffset>> partitionsRevokedHandler = (_, _) => { };
const int statisticsIntervalMs = 100;
var consumerConfig = new ConsumerConfig();
var consumerConfig = new ConsumerConfig
{
ClientId = "testeclient"
};

this.target
.Topics(topic1)
Expand Down Expand Up @@ -128,7 +131,7 @@ public void Build_AllCalls_ReturnPassedValues()
configuration.StatisticsHandlers.Should().HaveElementAt(0, statisticsHandler);
configuration.PartitionsAssignedHandlers.Should().HaveElementAt(0, partitionsAssignedHandler);
configuration.PartitionsRevokedHandlers.Should().HaveElementAt(0, partitionsRevokedHandler);
configuration.GetKafkaConfig().Should().BeSameAs(consumerConfig);
configuration.GetKafkaConfig().ClientId.Should().Be(consumerConfig.ClientId);
configuration.MiddlewaresConfigurations.Should().HaveCount(1);
}
}
Expand Down
21 changes: 12 additions & 9 deletions src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,19 +234,22 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
var middlewareConfiguration = this.middlewareConfigurationBuilder.Build();

this.consumerConfig ??= new ConsumerConfig();
this.consumerConfig.BootstrapServers ??= string.Join(",", clusterConfiguration.Brokers);
this.consumerConfig.GroupId ??= this.groupId;
this.consumerConfig.AutoOffsetReset ??= this.autoOffsetReset;
this.consumerConfig.MaxPollIntervalMs ??= this.maxPollIntervalMs;
this.consumerConfig.StatisticsIntervalMs ??= this.statisticsInterval;

this.consumerConfig.EnableAutoOffsetStore = false;
this.consumerConfig.EnableAutoCommit = false;
var consumerConfigCopy = new ConsumerConfig(this.consumerConfig.ToDictionary(x => x.Key, x => x.Value));

this.consumerConfig.ReadSecurityInformationFrom(clusterConfiguration);
consumerConfigCopy.BootstrapServers = this.consumerConfig.BootstrapServers ?? string.Join(",", clusterConfiguration.Brokers);
consumerConfigCopy.GroupId = this.consumerConfig.GroupId ?? this.groupId;
consumerConfigCopy.AutoOffsetReset = this.consumerConfig.AutoOffsetReset ?? this.autoOffsetReset;
consumerConfigCopy.MaxPollIntervalMs = this.consumerConfig.MaxPollIntervalMs ?? this.maxPollIntervalMs;
consumerConfigCopy.StatisticsIntervalMs = this.consumerConfig.StatisticsIntervalMs ?? this.statisticsInterval;

consumerConfigCopy.EnableAutoOffsetStore = false;
consumerConfigCopy.EnableAutoCommit = false;

consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration);

return new ConsumerConfiguration(
this.consumerConfig,
consumerConfigCopy,
this.topics,
this.topicsPartitions,
this.name,
Expand Down