Skip to content

Commit

Permalink
test: add integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
Douglas Lima authored and dougolima committed Feb 12, 2021
1 parent 60757cc commit b1f4109
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 6 deletions.
22 changes: 22 additions & 0 deletions src/KafkaFlow.IntegrationTests/ConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,27 @@ public async Task MessageOrderingTest()

CollectionAssert.AreEqual(versionsSent, versionsReceived);
}

[TestMethod]
public async Task PauseResumeHeartbeatTest()
{
// Arrange
var producer = this.provider.GetRequiredService<IMessageProducer<ProtobufProducer>>();
var messages = this.fixture.CreateMany<TestMessage1>(10).ToList();

// Act
await Task.WhenAll(messages.Select(m => producer.ProduceAsync(
Bootstrapper.PauseResumeTopicName,
m.Id.ToString(),
m)));

await Task.Delay(60000);

// Assert
foreach (var message in messages)
{
await MessageStorage.AssertMessageAsync(message);
}
}
}
}
47 changes: 41 additions & 6 deletions src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace KafkaFlow.IntegrationTests.Core
using System;
using System.IO;
using System.Threading;
using Confluent.Kafka;
using global::Microsoft.Extensions.Configuration;
using global::Microsoft.Extensions.DependencyInjection;
using global::Microsoft.Extensions.Hosting;
Expand All @@ -16,6 +17,7 @@ namespace KafkaFlow.IntegrationTests.Core
using KafkaFlow.Serializer.Json;
using KafkaFlow.Serializer.ProtoBuf;
using KafkaFlow.TypedHandler;
using AutoOffsetReset = KafkaFlow.AutoOffsetReset;

public static class Bootstrapper
{
Expand All @@ -25,6 +27,16 @@ public static class Bootstrapper
private const string JsonGzipTopicName = "test-json-gzip";
private const string ProtobufGzipTopicName = "test-protobuf-gzip";
private const string ProtobufGzipTopicName2 = "test-protobuf-gzip-2";
public const string PauseResumeTopicName = "test-pause-resume";

private const string ProtobufGroupId = "consumer-protobuf";
private const string JsonGroupId = "consumer-json";
private const string GzipGroupId = "consumer-gzip";
private const string JsonGzipGroupId = "consumer-json-gzip";
private const string ProtobufGzipGroupId = "consumer-protobuf-gzip";
private const string PauseResumeGroupId = "consumer-pause-resume";

public const int MaxPollIntervalMs = 7000;

private static readonly Lazy<IServiceProvider> lazyProvider = new Lazy<IServiceProvider>(SetupProvider);

Expand Down Expand Up @@ -75,7 +87,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddConsumer(
consumer => consumer
.Topic(ProtobufTopicName)
.WithGroupId("consumer-protobuf")
.WithGroupId(ProtobufGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
Expand All @@ -89,10 +101,32 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddHandler<MessageHandler>())
)
)
.AddConsumer(
consumer => consumer
.Topic(PauseResumeTopicName)
.WithGroupId(PauseResumeGroupId)
.WithBufferSize(3)
.WithWorkersCount(3)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.WithConsumerConfig(new ConsumerConfig
{
MaxPollIntervalMs = MaxPollIntervalMs,
SessionTimeoutMs = MaxPollIntervalMs
})
.AddMiddlewares(
middlewares => middlewares
.AddSingleTypeSerializer<TestMessage1, ProtobufMessageSerializer>()
.AddTypedHandlers(
handlers =>
handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandler<PauseResumeHandler>())
)
)
.AddConsumer(
consumer => consumer
.Topic(JsonTopicName)
.WithGroupId("consumer-json")
.WithGroupId(JsonGzipGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
Expand All @@ -103,13 +137,14 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
handlers =>
handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandlersFromAssemblyOf<MessageHandler>())
.AddHandler<MessageHandler>()
.AddHandler<MessageHandler2>())
)
)
.AddConsumer(
consumer => consumer
.Topics(GzipTopicName)
.WithGroupId("consumer-gzip")
.WithGroupId(GzipGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
Expand All @@ -122,7 +157,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddConsumer(
consumer => consumer
.Topics(JsonGzipTopicName)
.WithGroupId("consumer-json-gzip")
.WithGroupId(JsonGzipGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
Expand All @@ -140,7 +175,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddConsumer(
consumer => consumer
.Topics(ProtobufGzipTopicName, ProtobufGzipTopicName2)
.WithGroupId("consumer-protobuf-gzip")
.WithGroupId(ProtobufGzipGroupId)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
Expand Down
20 changes: 20 additions & 0 deletions src/KafkaFlow.IntegrationTests/Core/Handlers/PauseResumeHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace KafkaFlow.IntegrationTests.Core.Handlers
{
using System.Threading.Tasks;
using KafkaFlow.TypedHandler;
using Messages;

public class PauseResumeHandler : IMessageHandler<TestMessage1>
{
public async Task Handle(IMessageContext context, TestMessage1 message)
{
context.Consumer.Pause();

await Task.Delay(Bootstrapper.MaxPollIntervalMs + 1000);

MessageStorage.Add(message);

context.Consumer.Resume();
}
}
}

0 comments on commit b1f4109

Please sign in to comment.