diff --git a/samples/KafkaFlow.Sample.BatchOperations/PrintConsoleMiddleware.cs b/samples/KafkaFlow.Sample.BatchOperations/PrintConsoleMiddleware.cs index 11d38754d..438689b7a 100644 --- a/samples/KafkaFlow.Sample.BatchOperations/PrintConsoleMiddleware.cs +++ b/samples/KafkaFlow.Sample.BatchOperations/PrintConsoleMiddleware.cs @@ -1,7 +1,7 @@ using System; using System.Linq; using System.Threading.Tasks; -using KafkaFlow.Middlewares.BatchConsume; +using KafkaFlow.Batching; namespace KafkaFlow.Sample.BatchOperations; diff --git a/samples/KafkaFlow.Sample.BatchOperations/Program.cs b/samples/KafkaFlow.Sample.BatchOperations/Program.cs index 309445869..d2597957b 100644 --- a/samples/KafkaFlow.Sample.BatchOperations/Program.cs +++ b/samples/KafkaFlow.Sample.BatchOperations/Program.cs @@ -1,7 +1,7 @@ using System; using System.Linq; using KafkaFlow; -using KafkaFlow.Middlewares.BatchConsume; +using KafkaFlow.Batching; using KafkaFlow.Producers; using KafkaFlow.Sample.BatchOperations; using KafkaFlow.Serializer; @@ -37,7 +37,7 @@ .AddMiddlewares( middlewares => middlewares .AddDeserializer() - .BatchConsume(10, TimeSpan.FromSeconds(10)) + .AddBatching(10, TimeSpan.FromSeconds(10)) .Add() ) ) diff --git a/samples/KafkaFlow.Sample/Program.cs b/samples/KafkaFlow.Sample/Program.cs index 4f2e313e1..54339254d 100644 --- a/samples/KafkaFlow.Sample/Program.cs +++ b/samples/KafkaFlow.Sample/Program.cs @@ -33,7 +33,7 @@ .WithWorkersCount(3) .AddMiddlewares( middlewares => middlewares - .AddDeserializer() + .AddDeserializer() .AddTypedHandlers(h => h.AddHandler()) ) ) diff --git a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs index cefd11e59..a705233d7 100644 --- a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs @@ -54,7 +54,7 @@ public static IClusterConfigurationBuilder EnableAdminMessages( .DisableManagement() .AddMiddlewares( middlewares => middlewares - .AddDeserializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) @@ -123,7 +123,7 @@ public static IClusterConfigurationBuilder EnableTelemetry( .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddDeserializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) diff --git a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs index 5c83c6ac9..a50c44516 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs @@ -208,7 +208,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddSingleTypeSerializer(typeof(TestMessage1)) + .AddSingleTypeDeserializer(typeof(TestMessage1)) .AddTypedHandlers( handlers => handlers @@ -229,7 +229,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection }) .AddMiddlewares( middlewares => middlewares - .AddSingleTypeDeserializer() + .AddSingleTypeDeserializer() .AddTypedHandlers( handlers => handlers @@ -287,7 +287,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .AddMiddlewares( middlewares => middlewares .AddDecompressor() - .AddDeserializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers diff --git a/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs b/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs index 471c3ed65..12d8a706d 100644 --- a/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs +++ b/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs @@ -1,6 +1,5 @@ namespace KafkaFlow.Serializer { - using System; using System.IO; using System.Threading.Tasks; using ProtoBuf; @@ -8,7 +7,7 @@ /// /// A message serializer using protobuf-net library /// - public class ProtobufNetSerializer : ISerializer, IDeserializer + public class ProtobufNetSerializer : ISerializer { /// public Task SerializeAsync(object message, Stream output, ISerializerContext context) @@ -17,11 +16,5 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con return Task.CompletedTask; } - - /// - public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) - { - return Task.FromResult(Serializer.Deserialize(type, input)); - } } } diff --git a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs index 287aae481..ddebd5005 100644 --- a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs @@ -3,7 +3,7 @@ namespace KafkaFlow.UnitTests.BatchConsume using System; using System.Threading.Tasks; using FluentAssertions; - using KafkaFlow.Middlewares.BatchConsume; + using KafkaFlow.Batching; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; diff --git a/src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonDeserializerTests.cs b/src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonDeserializerTests.cs new file mode 100644 index 000000000..88fa963e8 --- /dev/null +++ b/src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonDeserializerTests.cs @@ -0,0 +1,48 @@ +namespace KafkaFlow.UnitTests.Serializers +{ + using System; + using System.IO; + using System.Text; + using System.Threading.Tasks; + using AutoFixture; + using FluentAssertions; + using KafkaFlow.Serializer; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + using Newtonsoft.Json; + + [TestClass] + public class NewtonsoftJsonDeserializerTests + { + private readonly Mock contextMock = new (); + private readonly NewtonsoftJsonDeserializer deserializer = new (); + + private readonly Fixture fixture = new(); + + [TestMethod] + public async Task DeserializeAsync_ValidPayload_ObjectGenerated() + { + // Arrange + var message = this.fixture.Create(); + using var input = new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message))); + + // Act + var result = await this.deserializer.DeserializeAsync(input, typeof(TestMessage), this.contextMock.Object); + + // Assert + result.Should().NotBeNull(); + result.Should().BeOfType(); + } + + private class TestMessage + { + public int IntegerField { get; set; } + + public string StringField { get; set; } + + public double DoubleField { get; set; } + + public DateTime DateTimeField { get; set; } + } + } +} diff --git a/src/KafkaFlow/Middlewares/BatchConsume/BatchConsumeMessageContext.cs b/src/KafkaFlow/Batching/BatchConsumeMessageContext.cs similarity index 95% rename from src/KafkaFlow/Middlewares/BatchConsume/BatchConsumeMessageContext.cs rename to src/KafkaFlow/Batching/BatchConsumeMessageContext.cs index aef603c68..4fb4d2c7f 100644 --- a/src/KafkaFlow/Middlewares/BatchConsume/BatchConsumeMessageContext.cs +++ b/src/KafkaFlow/Batching/BatchConsumeMessageContext.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Middlewares.BatchConsume +namespace KafkaFlow.Batching { using System; using System.Collections.Generic; diff --git a/src/KafkaFlow/Middlewares/BatchConsume/BatchConsumeMiddleware.cs b/src/KafkaFlow/Batching/BatchConsumeMiddleware.cs similarity index 98% rename from src/KafkaFlow/Middlewares/BatchConsume/BatchConsumeMiddleware.cs rename to src/KafkaFlow/Batching/BatchConsumeMiddleware.cs index 02fd02dce..4c9621950 100644 --- a/src/KafkaFlow/Middlewares/BatchConsume/BatchConsumeMiddleware.cs +++ b/src/KafkaFlow/Batching/BatchConsumeMiddleware.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Middlewares.BatchConsume +namespace KafkaFlow.Batching { using System; using System.Collections.Generic; diff --git a/src/KafkaFlow/Middlewares/BatchConsume/BatchConsumeExtensions.cs b/src/KafkaFlow/Batching/BatchingExtensions.cs similarity index 89% rename from src/KafkaFlow/Middlewares/BatchConsume/BatchConsumeExtensions.cs rename to src/KafkaFlow/Batching/BatchingExtensions.cs index d9c7613cb..f9fac63f3 100644 --- a/src/KafkaFlow/Middlewares/BatchConsume/BatchConsumeExtensions.cs +++ b/src/KafkaFlow/Batching/BatchingExtensions.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Middlewares.BatchConsume +namespace KafkaFlow.Batching { using System; using System.Collections.Generic; @@ -7,7 +7,7 @@ namespace KafkaFlow.Middlewares.BatchConsume /// /// no needed /// - public static class BatchConsumeExtensions + public static class BatchingExtensions { /// /// Accumulates a group of messages to be passed as a batch to the next middleware as just one message @@ -16,7 +16,7 @@ public static class BatchConsumeExtensions /// The maximum size of the batch, when this limit is reached the next middleware will be called /// The maximum time the middleware will wait to call the next middleware /// - public static IConsumerMiddlewareConfigurationBuilder BatchConsume( + public static IConsumerMiddlewareConfigurationBuilder AddBatching( this IConsumerMiddlewareConfigurationBuilder builder, int batchSize, TimeSpan batchTimeout) @@ -30,7 +30,7 @@ public static IConsumerMiddlewareConfigurationBuilder BatchConsume( } /// - /// Gets the accumulated grouped by BatchConsume middleware + /// Gets the accumulated grouped by batching middleware /// /// The message context /// All the contexts in the batch diff --git a/src/KafkaFlow/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs b/src/KafkaFlow/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs index 0000a6642..487731d1f 100644 --- a/src/KafkaFlow/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs +++ b/src/KafkaFlow/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs @@ -94,7 +94,7 @@ public static IConsumerMiddlewareConfigurationBuilder AddDeserializerThe message type /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares, Factory serializerFactory) where TDeserializer : class, IDeserializer @@ -115,7 +115,7 @@ public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer< this IConsumerMiddlewareConfigurationBuilder middlewares) where TDeserializer : class, IDeserializer { - return middlewares.AddSingleTypeSerializer(typeof(TMessage)); + return middlewares.AddSingleTypeDeserializer(typeof(TMessage)); } /// @@ -125,7 +125,7 @@ public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer< /// The message type /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares, Type messageType) where TDeserializer : class, IDeserializer @@ -145,7 +145,7 @@ public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializerThe message type /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares, Factory serializerFactory, Type messageType)