Skip to content

Commit

Permalink
fix: allow multiple typedHandlers handling the same message type
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch authored and dougolima committed Mar 31, 2021
1 parent b2cbb50 commit deb5b4e
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 33 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ init_broker:
@echo command | date
@echo Initializing Kafka broker
docker-compose -f docker-compose.yml up -d
docker exec kafka bash -c "kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-avro;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-json;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-json-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf-gzip-2"
docker exec kafka bash -c "kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-avro;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-json;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-json-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf-gzip-2;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-pause-resume"

shutdown_broker:
@echo command | date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public async Task ProtoBufGzipMessageTest()
// Act
await Task.WhenAll(messages.Select(m => producer.ProduceAsync(m.Id.ToString(), m)));


// Assert
foreach (var message in messages)
{
Expand Down
22 changes: 19 additions & 3 deletions src/KafkaFlow.IntegrationTests/ConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ public async Task MultipleTopicsSingleConsumerTest()
await MessageStorage.AssertCountMessageAsync(message, 2);
}
}

[TestMethod]
public async Task MultipleHandlersSingleTypeConsumerTest()
{
// Arrange
var producer = this.provider.GetRequiredService<IMessageProducer<JsonProducer>>();
var messages = this.fixture.CreateMany<TestMessage1>(5).ToList();

// Act
messages.ForEach(m => producer.Produce(m.Id.ToString(), m));

// Assert
foreach (var message in messages)
{
await MessageStorage.AssertCountMessageAsync(message, 2);
}
}

[TestMethod]
public async Task MessageOrderingTest()
Expand Down Expand Up @@ -102,12 +119,11 @@ public async Task MessageOrderingTest()
}

[TestMethod]
[Ignore]
public async Task PauseResumeHeartbeatTest()
{
// Arrange
var producer = this.provider.GetRequiredService<IMessageProducer<ProtobufProducer>>();
var messages = this.fixture.CreateMany<TestMessage1>(5).ToList();
var messages = this.fixture.CreateMany<PauseResumeMessage>(5).ToList();

// Act
await Task.WhenAll(messages.Select(m => producer.ProduceAsync(
Expand All @@ -123,5 +139,5 @@ await Task.WhenAll(messages.Select(m => producer.ProduceAsync(
await MessageStorage.AssertMessageAsync(message);
}
}
}
}
}
5 changes: 2 additions & 3 deletions src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
})
.AddMiddlewares(
middlewares => middlewares
.AddSingleTypeSerializer<TestMessage1, ProtobufMessageSerializer>()
.AddSingleTypeSerializer<PauseResumeMessage, ProtobufMessageSerializer>()
.AddTypedHandlers(
handlers =>
handlers
Expand All @@ -177,8 +177,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
handlers =>
handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandler<MessageHandler>()
.AddHandler<MessageHandler2>())
.AddHandlersFromAssemblyOf<MessageHandler>())
)
)
.AddConsumer(
Expand Down
15 changes: 15 additions & 0 deletions src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler1.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace KafkaFlow.IntegrationTests.Core.Handlers
{
using System.Threading.Tasks;
using KafkaFlow.TypedHandler;
using Messages;

public class MessageHandler1 : IMessageHandler<TestMessage1>
{
public Task Handle(IMessageContext context, TestMessage1 message)
{
MessageStorage.Add(message);
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace KafkaFlow.IntegrationTests.Core.Handlers
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Avro.Specific;
using global::Microsoft.VisualStudio.TestTools.UnitTesting;
using Messages;
using MessageTypes;
Expand Down Expand Up @@ -58,7 +57,7 @@ public static async Task AssertMessageAsync(ITestMessage message)
{
if (DateTime.Now.Subtract(start).Seconds > timeoutSec)
{
Assert.Fail("Message not received");
Assert.Fail("Message (ITestMessage) not received");
return;
}

Expand All @@ -74,7 +73,7 @@ public static async Task AssertMessageAsync(LogMessages2 message)
{
if (DateTime.Now.Subtract(start).Seconds > timeoutSec)
{
Assert.Fail("Message not received");
Assert.Fail("Message (LogMessages2) not received");
return;
}

Expand All @@ -90,7 +89,7 @@ public static async Task AssertMessageAsync(byte[] message)
{
if (DateTime.Now.Subtract(start).Seconds > timeoutSec)
{
Assert.Fail("Message not received");
Assert.Fail("Message (byte[]) not received");
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ namespace KafkaFlow.IntegrationTests.Core.Handlers
using KafkaFlow.TypedHandler;
using Messages;

public class PauseResumeHandler : IMessageHandler<TestMessage1>
public class PauseResumeHandler : IMessageHandler<PauseResumeMessage>
{
public async Task Handle(IMessageContext context, TestMessage1 message)
public async Task Handle(IMessageContext context, PauseResumeMessage message)
{
context.Consumer.Pause();

Expand Down
18 changes: 18 additions & 0 deletions src/KafkaFlow.IntegrationTests/Core/Messages/PauseResumeMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace KafkaFlow.IntegrationTests.Core.Messages
{
using System;
using System.Runtime.Serialization;

[DataContract]
public class PauseResumeMessage : ITestMessage
{
[DataMember(Order = 1)]
public Guid Id { get; set; }

[DataMember(Order = 2)]
public string Value { get; set; }

[DataMember(Order = 3)]
public int Version { get; set; }
}
}
11 changes: 7 additions & 4 deletions src/KafkaFlow.TypedHandler/HandlerTypeMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@ namespace KafkaFlow.TypedHandler
{
using System;
using System.Collections.Generic;
using System.Linq;

internal class HandlerTypeMapping
{
private readonly Dictionary<Type, Type> mapping = new Dictionary<Type, Type>();
private readonly Dictionary<Type, IEnumerable<Type>> mapping = new Dictionary<Type, IEnumerable<Type>>();

public void AddMapping(Type messageType, Type handlerType)
{
this.mapping.Add(messageType, handlerType);
var handlers = this.GetHandlersTypes(messageType);

this.mapping[messageType] = handlers.Append(handlerType);
}

public Type GetHandlerType(Type messageType)
public IEnumerable<Type> GetHandlersTypes(Type messageType)
{
return this.mapping.TryGetValue(messageType, out var handlerType) ? handlerType : null;
return this.mapping.TryGetValue(messageType, out var handlerType) ? handlerType : Enumerable.Empty<Type>();
}
}
}
6 changes: 6 additions & 0 deletions src/KafkaFlow.TypedHandler/KafkaFlow.TypedHandler.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,10 @@
<ProjectReference Include="..\KafkaFlow.Abstractions\KafkaFlow.Abstractions.csproj"/>
</ItemGroup>

<ItemGroup>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>KafkaFlow.UnitTests</_Parameter1>
</AssemblyAttribute>
</ItemGroup>

</Project>
27 changes: 12 additions & 15 deletions src/KafkaFlow.TypedHandler/TypedHandlerMiddleware.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace KafkaFlow.TypedHandler
{
using System.Linq;
using System.Threading.Tasks;

internal class TypedHandlerMiddleware : IMessageMiddleware
Expand All @@ -19,21 +20,17 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
using (var scope = this.dependencyResolver.CreateScope())
{
var handlerType = this.configuration.HandlerMapping.GetHandlerType(context.Message.GetType());

if (handlerType == null)
{
return;
}

var handler = scope.Resolver.Resolve(handlerType);

await HandlerExecutor
.GetExecutor(context.Message.GetType())
.Execute(
handler,
context,
context.Message)
await Task.WhenAll(
this.configuration
.HandlerMapping
.GetHandlersTypes(context.Message.GetType())
.Select(t =>
HandlerExecutor
.GetExecutor(context.Message.GetType())
.Execute(
scope.Resolver.Resolve(t),
context,
context.Message)))
.ConfigureAwait(false);
}

Expand Down
1 change: 1 addition & 0 deletions src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<ProjectReference Include="..\KafkaFlow.BatchConsume\KafkaFlow.BatchConsume.csproj" />
<ProjectReference Include="..\KafkaFlow.Compressor\KafkaFlow.Compressor.csproj" />
<ProjectReference Include="..\KafkaFlow.Serializer\KafkaFlow.Serializer.csproj" />
<ProjectReference Include="..\KafkaFlow.TypedHandler\KafkaFlow.TypedHandler.csproj" />
<ProjectReference Include="..\KafkaFlow\KafkaFlow.csproj" />
</ItemGroup>

Expand Down
34 changes: 34 additions & 0 deletions src/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
namespace KafkaFlow.UnitTests.TypedHandler
{
using System;
using FluentAssertions;
using KafkaFlow.TypedHandler;
using Microsoft.VisualStudio.TestTools.UnitTesting;

[TestClass]
public class HandlerTypeMappingTests
{
private HandlerTypeMapping target;

[TestInitialize]
public void Setup()
{
this.target = new HandlerTypeMapping();
}

[TestMethod]
public void AddSeveralMappings_GetHandlersTypesReturnsListOfHandlers()
{
// Act
this.target.AddMapping(typeof(int), typeof(string));
this.target.AddMapping(typeof(int), typeof(double));
this.target.AddMapping(typeof(int), typeof(bool));

// Assert
this.target.GetHandlersTypes(typeof(int)).Should().BeEquivalentTo(
typeof(string),
typeof(double),
typeof(bool));
}
}
}

0 comments on commit deb5b4e

Please sign in to comment.