From ce7b5d7a667fb42bcd98452566e8025f5934ea9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Sousa?= Date: Wed, 20 Sep 2023 21:44:34 +0100 Subject: [PATCH] feat!: segregated serializer and compressor interfaces --- .../Program.cs | 4 +- .../Program.cs | 7 +- .../KafkaFlow.Sample.FlowControl/Program.cs | 4 +- .../Program.cs | 4 +- .../Program.cs | 4 +- samples/KafkaFlow.Sample/Program.cs | 4 +- .../ClusterConfigurationBuilderExtensions.cs | 6 +- .../GzipMessageCompressor.cs | 14 --- .../GzipMessageDecompressor.cs | 25 +++++ .../KafkaFlow.Compressor.Gzip.csproj | 1 + .../Core/Bootstrapper.cs | 20 ++-- .../SchemaRegistryTypeResolver.cs | 2 +- .../JsonCoreDeserializer.cs | 40 ++++++++ .../JsonCoreSerializer.cs | 11 +-- .../KafkaFlow.Serializer.JsonCore.csproj | 1 + ...KafkaFlow.Serializer.NewtonsoftJson.csproj | 1 + .../NewtonsoftJsonDeserializer.cs | 51 ++++++++++ .../NewtonsoftJsonSerializer.cs | 16 ---- .../KafkaFlow.Serializer.ProtobufNet.csproj | 1 + .../ProtobufNetDeserializer.cs | 19 ++++ .../ProtobufNetSerializer.cs | 2 +- .../ConfluentAvroDeserializer.cs | 42 ++++++++ .../ConfluentAvroSerializer.cs | 14 --- .../ConsumerConfigurationBuilderExtensions.cs | 6 +- .../ConfluentJsonDeserializer.cs | 39 ++++++++ .../ConfluentJsonSerializer.cs | 14 --- .../ConsumerConfigurationBuilderExtensions.cs | 8 +- .../ProducerConfigurationBuilderExtensions.cs | 4 +- .../ConfluentProtobufDeserializer.cs | 59 ++++++++++++ .../ConfluentProtobufSerializer.cs | 14 --- .../ConsumerConfigurationBuilderExtensions.cs | 6 +- .../CompressorConsumerMiddlewareTests.cs | 17 ++-- .../CompressorProducerMiddlewareTests.cs | 3 +- .../NewtonsoftJsonSerializerTests.cs | 17 ---- .../SerializerConsumerMiddlewareTests.cs | 21 ++-- .../SerializerProducerMiddlewareTests.cs | 3 +- .../Compressor}/IMessageCompressor.cs | 9 +- .../Compressor/IMessageDecompressor.cs | 15 +++ .../CompressorConsumerMiddleware.cs | 14 +-- .../CompressorProducerMiddleware.cs | 2 +- .../ConfigurationBuilderExtensions.cs | 17 ++-- .../ConsumerMiddlewareConfigurationBuilder.cs | 95 ++++++++++--------- .../ProducerMiddlewareConfigurationBuilder.cs | 5 +- src/KafkaFlow/Serializer/IDeserializer.cs | 21 ++++ .../Serializer}/ISerializer.cs | 12 +-- .../SerializerConsumerMiddleware.cs | 20 ++-- .../SerializerProducerMiddleware.cs | 3 +- .../AsyncMessageTypeResolverWrapper.cs | 2 +- .../Resolvers/DefaultTypeResolver.cs | 2 +- .../Resolvers/IAsyncMessageTypeResolver.cs | 2 +- .../Resolvers/IMessageTypeResolver.cs | 2 +- .../Resolvers/SingleMessageTypeResolver.cs | 2 +- 52 files changed, 467 insertions(+), 260 deletions(-) create mode 100644 src/KafkaFlow.Compressor.Gzip/GzipMessageDecompressor.cs create mode 100644 src/KafkaFlow.Serializer.JsonCore/JsonCoreDeserializer.cs create mode 100644 src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonDeserializer.cs create mode 100644 src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetDeserializer.cs create mode 100644 src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroDeserializer.cs create mode 100644 src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonDeserializer.cs create mode 100644 src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufDeserializer.cs rename src/{KafkaFlow.Abstractions => KafkaFlow/Compressor}/IMessageCompressor.cs (58%) create mode 100644 src/KafkaFlow/Compressor/IMessageDecompressor.cs rename src/KafkaFlow/{Middlewares/Compression => Compressor/Middlewares}/CompressorConsumerMiddleware.cs (64%) rename src/KafkaFlow/{Middlewares/Compression => Compressor/Middlewares}/CompressorProducerMiddleware.cs (96%) rename src/KafkaFlow/{Middlewares => }/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs (63%) rename src/KafkaFlow/{Middlewares => }/Serializer/Configuration/ProducerMiddlewareConfigurationBuilder.cs (98%) create mode 100644 src/KafkaFlow/Serializer/IDeserializer.cs rename src/{KafkaFlow.Abstractions => KafkaFlow/Serializer}/ISerializer.cs (56%) rename src/KafkaFlow/{Middlewares/Serializer => Serializer/Middlewares}/SerializerConsumerMiddleware.cs (82%) rename src/KafkaFlow/{Middlewares/Serializer => Serializer/Middlewares}/SerializerProducerMiddleware.cs (97%) rename src/KafkaFlow/{Middlewares => }/Serializer/Resolvers/AsyncMessageTypeResolverWrapper.cs (91%) rename src/KafkaFlow/{Middlewares => }/Serializer/Resolvers/DefaultTypeResolver.cs (93%) rename src/KafkaFlow/{Middlewares => }/Serializer/Resolvers/IAsyncMessageTypeResolver.cs (93%) rename src/KafkaFlow/{Middlewares => }/Serializer/Resolvers/IMessageTypeResolver.cs (93%) rename src/KafkaFlow/{Middlewares => }/Serializer/Resolvers/SingleMessageTypeResolver.cs (94%) diff --git a/samples/KafkaFlow.Sample.BatchOperations/Program.cs b/samples/KafkaFlow.Sample.BatchOperations/Program.cs index cf5b8eea8..309445869 100644 --- a/samples/KafkaFlow.Sample.BatchOperations/Program.cs +++ b/samples/KafkaFlow.Sample.BatchOperations/Program.cs @@ -2,10 +2,10 @@ using System.Linq; using KafkaFlow; using KafkaFlow.Middlewares.BatchConsume; -using KafkaFlow.Middlewares.Serializer.Configuration; using KafkaFlow.Producers; using KafkaFlow.Sample.BatchOperations; using KafkaFlow.Serializer; +using KafkaFlow.Serializer.Configuration; using Microsoft.Extensions.DependencyInjection; var services = new ServiceCollection(); @@ -36,7 +36,7 @@ .WithWorkersCount(1) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .BatchConsume(10, TimeSpan.FromSeconds(10)) .Add() ) diff --git a/samples/KafkaFlow.Sample.ConsumerThrottling/Program.cs b/samples/KafkaFlow.Sample.ConsumerThrottling/Program.cs index 2a1ce2b68..2d74f9b0b 100644 --- a/samples/KafkaFlow.Sample.ConsumerThrottling/Program.cs +++ b/samples/KafkaFlow.Sample.ConsumerThrottling/Program.cs @@ -13,10 +13,9 @@ The ThrottleConsumer mechanism works by checking the lag of "consumerA" every 1 using System; using System.Threading.Tasks; using KafkaFlow; -using KafkaFlow.Middlewares.Serializer; -using KafkaFlow.Middlewares.Serializer.Configuration; using KafkaFlow.Producers; using KafkaFlow.Serializer; +using KafkaFlow.Serializer.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -50,7 +49,7 @@ The ThrottleConsumer mechanism works by checking the lag of "consumerA" every 1 .WithWorkersCount(1) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .Add() ) ) @@ -70,7 +69,7 @@ The ThrottleConsumer mechanism works by checking the lag of "consumerA" every 1 .AddAction(a => a.AboveThreshold(10).ApplyDelay(1_000)) .AddAction(a => a.AboveThreshold(20).ApplyDelay(5_000)) .AddAction(a => a.AboveThreshold(30).ApplyDelay(10_000))) - .AddSerializer() + .AddDeserializer() .Add() ) ) diff --git a/samples/KafkaFlow.Sample.FlowControl/Program.cs b/samples/KafkaFlow.Sample.FlowControl/Program.cs index 136d2126e..6706ffd30 100644 --- a/samples/KafkaFlow.Sample.FlowControl/Program.cs +++ b/samples/KafkaFlow.Sample.FlowControl/Program.cs @@ -1,8 +1,8 @@ using System; using KafkaFlow; -using KafkaFlow.Middlewares.Serializer.Configuration; using KafkaFlow.Sample.FlowControl; using KafkaFlow.Serializer; +using KafkaFlow.Serializer.Configuration; using Microsoft.Extensions.DependencyInjection; var services = new ServiceCollection(); @@ -36,7 +36,7 @@ .WithWorkersCount(1) .AddMiddlewares( m => m - .AddSingleTypeSerializer() + .AddSingleTypeDeserializer() .Add() ) ); diff --git a/samples/KafkaFlow.Sample.PauseConsumerOnError/Program.cs b/samples/KafkaFlow.Sample.PauseConsumerOnError/Program.cs index 14d8f0f03..a3c764d41 100644 --- a/samples/KafkaFlow.Sample.PauseConsumerOnError/Program.cs +++ b/samples/KafkaFlow.Sample.PauseConsumerOnError/Program.cs @@ -1,8 +1,8 @@ using KafkaFlow; -using KafkaFlow.Middlewares.Serializer.Configuration; using KafkaFlow.Producers; using KafkaFlow.Sample.PauseConsumerOnError; using KafkaFlow.Serializer; +using KafkaFlow.Serializer.Configuration; using Microsoft.Extensions.DependencyInjection; var services = new ServiceCollection(); @@ -33,7 +33,7 @@ middlewares => middlewares .Add() - .AddSerializer() + .AddDeserializer() .AddTypedHandlers(h => h.AddHandler()) ) ) diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs b/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs index 3e7c4c095..926879042 100644 --- a/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs +++ b/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs @@ -73,7 +73,7 @@ .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddSchemaRegistryAvroSerializer() + .AddSchemaRegistryAvroDeserializer() .AddTypedHandlers( handlers => handlers .AddHandler() @@ -102,7 +102,7 @@ .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddSchemaRegistryProtobufSerializer() + .AddSchemaRegistryProtobufDeserializer() .AddTypedHandlers(handlers => handlers.AddHandler()) ) ) diff --git a/samples/KafkaFlow.Sample/Program.cs b/samples/KafkaFlow.Sample/Program.cs index eed68d674..4f2e313e1 100644 --- a/samples/KafkaFlow.Sample/Program.cs +++ b/samples/KafkaFlow.Sample/Program.cs @@ -1,10 +1,10 @@ using System; using System.Threading.Tasks; using KafkaFlow; -using KafkaFlow.Middlewares.Serializer.Configuration; using KafkaFlow.Producers; using KafkaFlow.Sample; using KafkaFlow.Serializer; +using KafkaFlow.Serializer.Configuration; using Microsoft.Extensions.DependencyInjection; var services = new ServiceCollection(); @@ -33,7 +33,7 @@ .WithWorkersCount(3) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .AddTypedHandlers(h => h.AddHandler()) ) ) diff --git a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs index c19388ee7..cefd11e59 100644 --- a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs @@ -5,8 +5,8 @@ using KafkaFlow.Admin; using KafkaFlow.Admin.Handlers; using KafkaFlow.Configuration; - using KafkaFlow.Middlewares.Serializer.Configuration; using KafkaFlow.Serializer; + using KafkaFlow.Serializer.Configuration; /// /// No needed @@ -54,7 +54,7 @@ public static IClusterConfigurationBuilder EnableAdminMessages( .DisableManagement() .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) @@ -123,7 +123,7 @@ public static IClusterConfigurationBuilder EnableTelemetry( .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) diff --git a/src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs b/src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs index f49237c59..5a03c948d 100644 --- a/src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs +++ b/src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs @@ -23,19 +23,5 @@ public byte[] Compress(byte[] message) return outputStream.ToArray(); } - - /// - public byte[] Decompress(byte[] message) - { - using var outputStream = new MemoryStream(); - using var inputStream = new MemoryStream(message); - - using (var gzipStream = new GZipStream(inputStream, CompressionMode.Decompress)) - { - gzipStream.CopyTo(outputStream); - } - - return outputStream.ToArray(); - } } } diff --git a/src/KafkaFlow.Compressor.Gzip/GzipMessageDecompressor.cs b/src/KafkaFlow.Compressor.Gzip/GzipMessageDecompressor.cs new file mode 100644 index 000000000..aa4cf21d3 --- /dev/null +++ b/src/KafkaFlow.Compressor.Gzip/GzipMessageDecompressor.cs @@ -0,0 +1,25 @@ +namespace KafkaFlow.Compressor.Gzip +{ + using System.IO; + using System.IO.Compression; + + /// + /// A GZIP message decompressor + /// + public class GzipMessageDecompressor : IMessageDecompressor + { + /// + public byte[] Decompress(byte[] message) + { + using var outputStream = new MemoryStream(); + using var inputStream = new MemoryStream(message); + + using (var gzipStream = new GZipStream(inputStream, CompressionMode.Decompress)) + { + gzipStream.CopyTo(outputStream); + } + + return outputStream.ToArray(); + } + } +} diff --git a/src/KafkaFlow.Compressor.Gzip/KafkaFlow.Compressor.Gzip.csproj b/src/KafkaFlow.Compressor.Gzip/KafkaFlow.Compressor.Gzip.csproj index 54f7ece59..7d2c63f51 100644 --- a/src/KafkaFlow.Compressor.Gzip/KafkaFlow.Compressor.Gzip.csproj +++ b/src/KafkaFlow.Compressor.Gzip/KafkaFlow.Compressor.Gzip.csproj @@ -8,5 +8,6 @@ + \ No newline at end of file diff --git a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs index b2e963ea9..5c83c6ac9 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs @@ -14,8 +14,8 @@ namespace KafkaFlow.IntegrationTests.Core using KafkaFlow.IntegrationTests.Core.Messages; using KafkaFlow.IntegrationTests.Core.Middlewares; using KafkaFlow.IntegrationTests.Core.Producers; - using KafkaFlow.Middlewares.Serializer.Configuration; using KafkaFlow.Serializer; + using KafkaFlow.Serializer.Configuration; using KafkaFlow.Serializer.SchemaRegistry; using AutoOffsetReset = KafkaFlow.AutoOffsetReset; @@ -128,7 +128,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithConsumerConfig(defaultConfig) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) @@ -156,7 +156,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithConsumerConfig(defaultConfig) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) @@ -184,7 +184,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithConsumerConfig(defaultConfig) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) @@ -229,7 +229,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection }) .AddMiddlewares( middlewares => middlewares - .AddSingleTypeSerializer() + .AddSingleTypeDeserializer() .AddTypedHandlers( handlers => handlers @@ -244,7 +244,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers @@ -259,7 +259,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddCompressor() + .AddDecompressor() .Add())) .AddConsumer( consumer => consumer @@ -270,7 +270,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddSerializer(_ => new JsonCoreSerializer()) + .AddDeserializer(_ => new JsonCoreDeserializer()) .AddTypedHandlers( handlers => handlers @@ -286,8 +286,8 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithAutoCommitIntervalMs(1) .AddMiddlewares( middlewares => middlewares - .AddCompressor() - .AddSerializer() + .AddDecompressor() + .AddDeserializer() .AddTypedHandlers( handlers => handlers diff --git a/src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs b/src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs index c13a237de..e956036b3 100644 --- a/src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs +++ b/src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs @@ -6,7 +6,7 @@ namespace KafkaFlow using System.Linq; using System.Threading; using System.Threading.Tasks; - using KafkaFlow.Middlewares.Serializer.Resolvers; + using KafkaFlow.Serializer.Resolvers; /// /// The message type resolver to be used with schema registry serializers diff --git a/src/KafkaFlow.Serializer.JsonCore/JsonCoreDeserializer.cs b/src/KafkaFlow.Serializer.JsonCore/JsonCoreDeserializer.cs new file mode 100644 index 000000000..521b3be16 --- /dev/null +++ b/src/KafkaFlow.Serializer.JsonCore/JsonCoreDeserializer.cs @@ -0,0 +1,40 @@ +namespace KafkaFlow.Serializer +{ + using System; + using System.IO; + using System.Text.Json; + using System.Threading.Tasks; + + /// + /// A message deserializer using System.Text.Json library + /// + public class JsonCoreDeserializer : IDeserializer + { + private readonly JsonSerializerOptions serializerOptions; + + /// + /// Initializes a new instance of the class. + /// + /// Json serializer options + public JsonCoreDeserializer(JsonSerializerOptions options) + { + this.serializerOptions = options; + } + + /// + /// Initializes a new instance of the class. + /// + public JsonCoreDeserializer() + : this(new JsonSerializerOptions()) + { + } + + /// + public async Task DeserializeAsync(Stream input, Type type, ISerializerContext context) + { + return await JsonSerializer + .DeserializeAsync(input, type, this.serializerOptions) + .ConfigureAwait(false); + } + } +} diff --git a/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs b/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs index 5bb840360..e0069dd7e 100644 --- a/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs +++ b/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs @@ -1,6 +1,5 @@ namespace KafkaFlow.Serializer { - using System; using System.IO; using System.Text.Json; using System.Threading.Tasks; @@ -59,13 +58,5 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con return Task.CompletedTask; } - - /// - public async Task DeserializeAsync(Stream input, Type type, ISerializerContext context) - { - return await JsonSerializer - .DeserializeAsync(input, type, this.serializerOptions) - .ConfigureAwait(false); - } } -} \ No newline at end of file +} diff --git a/src/KafkaFlow.Serializer.JsonCore/KafkaFlow.Serializer.JsonCore.csproj b/src/KafkaFlow.Serializer.JsonCore/KafkaFlow.Serializer.JsonCore.csproj index 252b999e7..efa94e927 100644 --- a/src/KafkaFlow.Serializer.JsonCore/KafkaFlow.Serializer.JsonCore.csproj +++ b/src/KafkaFlow.Serializer.JsonCore/KafkaFlow.Serializer.JsonCore.csproj @@ -13,5 +13,6 @@ + diff --git a/src/KafkaFlow.Serializer.NewtonsoftJson/KafkaFlow.Serializer.NewtonsoftJson.csproj b/src/KafkaFlow.Serializer.NewtonsoftJson/KafkaFlow.Serializer.NewtonsoftJson.csproj index 8d68ef533..7d5add917 100644 --- a/src/KafkaFlow.Serializer.NewtonsoftJson/KafkaFlow.Serializer.NewtonsoftJson.csproj +++ b/src/KafkaFlow.Serializer.NewtonsoftJson/KafkaFlow.Serializer.NewtonsoftJson.csproj @@ -9,6 +9,7 @@ + diff --git a/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonDeserializer.cs b/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonDeserializer.cs new file mode 100644 index 000000000..f0b05dcff --- /dev/null +++ b/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonDeserializer.cs @@ -0,0 +1,51 @@ +namespace KafkaFlow.Serializer +{ + using System; + using System.IO; + using System.Text; + using System.Threading.Tasks; + using Newtonsoft.Json; + + /// + /// A message deserializer using NewtonsoftJson library + /// + public class NewtonsoftJsonDeserializer : IDeserializer + { + private const int DefaultBufferSize = 1024; + + private static readonly UTF8Encoding UTF8NoBom = new (false); + private readonly JsonSerializerSettings settings; + + /// + /// Initializes a new instance of the class. + /// + /// Json serializer settings + public NewtonsoftJsonDeserializer(JsonSerializerSettings settings) + { + this.settings = settings; + } + + /// + /// Initializes a new instance of the class. + /// + public NewtonsoftJsonDeserializer() + : this(new JsonSerializerSettings()) + { + } + + /// + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) + { + using var sr = new StreamReader( + input, + UTF8NoBom, + true, + DefaultBufferSize, + true); + + var serializer = JsonSerializer.CreateDefault(this.settings); + + return Task.FromResult(serializer.Deserialize(sr, type)); + } + } +} diff --git a/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonSerializer.cs b/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonSerializer.cs index bbc943482..8ca731d2c 100644 --- a/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonSerializer.cs +++ b/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonSerializer.cs @@ -1,6 +1,5 @@ namespace KafkaFlow.Serializer { - using System; using System.IO; using System.Text; using System.Threading.Tasks; @@ -43,20 +42,5 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con return Task.CompletedTask; } - - /// - public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) - { - using var sr = new StreamReader( - input, - UTF8NoBom, - true, - DefaultBufferSize, - true); - - var serializer = JsonSerializer.CreateDefault(this.settings); - - return Task.FromResult(serializer.Deserialize(sr, type)); - } } } diff --git a/src/KafkaFlow.Serializer.ProtobufNet/KafkaFlow.Serializer.ProtobufNet.csproj b/src/KafkaFlow.Serializer.ProtobufNet/KafkaFlow.Serializer.ProtobufNet.csproj index 0a72e7712..cb94989cf 100644 --- a/src/KafkaFlow.Serializer.ProtobufNet/KafkaFlow.Serializer.ProtobufNet.csproj +++ b/src/KafkaFlow.Serializer.ProtobufNet/KafkaFlow.Serializer.ProtobufNet.csproj @@ -13,5 +13,6 @@ + diff --git a/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetDeserializer.cs b/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetDeserializer.cs new file mode 100644 index 000000000..c4838c35a --- /dev/null +++ b/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetDeserializer.cs @@ -0,0 +1,19 @@ +namespace KafkaFlow.Serializer +{ + using System; + using System.IO; + using System.Threading.Tasks; + using ProtoBuf; + + /// + /// A message deserializer using protobuf-net library + /// + public class ProtobufNetDeserializer : IDeserializer + { + /// + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) + { + return Task.FromResult(Serializer.Deserialize(type, input)); + } + } +} diff --git a/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs b/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs index 50ff531a9..471c3ed65 100644 --- a/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs +++ b/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs @@ -8,7 +8,7 @@ /// /// A message serializer using protobuf-net library /// - public class ProtobufNetSerializer : ISerializer + public class ProtobufNetSerializer : ISerializer, IDeserializer { /// public Task SerializeAsync(object message, Stream output, ISerializerContext context) diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroDeserializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroDeserializer.cs new file mode 100644 index 000000000..3f29adc7d --- /dev/null +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroDeserializer.cs @@ -0,0 +1,42 @@ +namespace KafkaFlow.Serializer.SchemaRegistry +{ + using System; + using System.IO; + using System.Threading.Tasks; + using Confluent.SchemaRegistry; + using Confluent.SchemaRegistry.Serdes; + + /// + /// A message serializer using Apache.Avro library + /// + public class ConfluentAvroDeserializer : IDeserializer + { + private readonly ISchemaRegistryClient schemaRegistryClient; + + /// + /// Initializes a new instance of the class. + /// + /// The to be used by the framework + public ConfluentAvroDeserializer(IDependencyResolver resolver) + { + this.schemaRegistryClient = + resolver.Resolve() ?? + throw new InvalidOperationException( + $"No schema registry configuration was found. Set it using {nameof(ClusterConfigurationBuilderExtensions.WithSchemaRegistry)} on cluster configuration"); + } + + /// + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) + { + return ConfluentDeserializerWrapper + .GetOrCreateDeserializer( + type, + () => Activator + .CreateInstance( + typeof(AvroDeserializer<>).MakeGenericType(type), + this.schemaRegistryClient, + null)) + .DeserializeAsync(input, context); + } + } +} diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs index 4fe5c2099..e0e2eb49e 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs @@ -43,19 +43,5 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con this.serializerConfig)) .SerializeAsync(message, output, context); } - - /// - public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) - { - return ConfluentDeserializerWrapper - .GetOrCreateDeserializer( - type, - () => Activator - .CreateInstance( - typeof(AvroDeserializer<>).MakeGenericType(type), - this.schemaRegistryClient, - null)) - .DeserializeAsync(input, context); - } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs index ba2862b8d..897e30f9a 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs @@ -2,7 +2,7 @@ { using Confluent.SchemaRegistry; using KafkaFlow.Configuration; - using KafkaFlow.Middlewares.Serializer; + using KafkaFlow.Serializer.Middlewares; using KafkaFlow.Serializer.SchemaRegistry; /// @@ -15,12 +15,12 @@ public static class ConsumerConfigurationBuilderExtensions /// /// The middleware configuration builder /// - public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryAvroSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryAvroDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares) { return middlewares.Add( resolver => new SerializerConsumerMiddleware( - new ConfluentAvroSerializer(resolver), + new ConfluentAvroDeserializer(resolver), new SchemaRegistryTypeResolver(new ConfluentAvroTypeNameResolver(resolver.Resolve())))); } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonDeserializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonDeserializer.cs new file mode 100644 index 000000000..0ec931348 --- /dev/null +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonDeserializer.cs @@ -0,0 +1,39 @@ +namespace KafkaFlow.Serializer.SchemaRegistry +{ + using System; + using System.IO; + using System.Threading.Tasks; + using Confluent.SchemaRegistry.Serdes; + using NJsonSchema.Generation; + + /// + /// A json message serializer integrated with the confluent schema registry + /// + public class ConfluentJsonDeserializer : IDeserializer + { + private readonly JsonSchemaGeneratorSettings schemaGeneratorSettings; + + /// + /// Initializes a new instance of the class. + /// + /// An instance of + public ConfluentJsonDeserializer(JsonSchemaGeneratorSettings schemaGeneratorSettings = null) + { + this.schemaGeneratorSettings = schemaGeneratorSettings; + } + + /// + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) + { + return ConfluentDeserializerWrapper + .GetOrCreateDeserializer( + type, + () => Activator + .CreateInstance( + typeof(JsonDeserializer<>).MakeGenericType(type), + null, + this.schemaGeneratorSettings)) + .DeserializeAsync(input, context); + } + } +} diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs index 4b827857e..fa2763903 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs @@ -62,19 +62,5 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con this.schemaGeneratorSettings)) .SerializeAsync(message, output, context); } - - /// - public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) - { - return ConfluentDeserializerWrapper - .GetOrCreateDeserializer( - type, - () => Activator - .CreateInstance( - typeof(JsonDeserializer<>).MakeGenericType(type), - null, - this.schemaGeneratorSettings)) - .DeserializeAsync(input, context); - } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConsumerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConsumerConfigurationBuilderExtensions.cs index 9928afd6b..14be004ef 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConsumerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConsumerConfigurationBuilderExtensions.cs @@ -1,8 +1,8 @@ namespace KafkaFlow { using KafkaFlow.Configuration; - using KafkaFlow.Middlewares.Serializer.Configuration; - using KafkaFlow.Middlewares.Serializer.Resolvers; + using KafkaFlow.Serializer.Configuration; + using KafkaFlow.Serializer.Resolvers; using KafkaFlow.Serializer.SchemaRegistry; /// @@ -19,8 +19,8 @@ public static class ConsumerConfigurationBuilderExtensions public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryJsonSerializer( this IConsumerMiddlewareConfigurationBuilder middlewares) { - return middlewares.AddSerializer( - resolver => new ConfluentJsonSerializer(resolver), + return middlewares.AddDeserializer( + resolver => new ConfluentJsonDeserializer(), _ => new SingleMessageTypeResolver(typeof(TMessage))); } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ProducerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ProducerConfigurationBuilderExtensions.cs index 31a7f35f0..b314a336a 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ProducerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ProducerConfigurationBuilderExtensions.cs @@ -2,8 +2,8 @@ { using Confluent.SchemaRegistry.Serdes; using KafkaFlow.Configuration; - using KafkaFlow.Middlewares.Serializer.Configuration; - using KafkaFlow.Middlewares.Serializer.Resolvers; + using KafkaFlow.Serializer.Configuration; + using KafkaFlow.Serializer.Resolvers; using KafkaFlow.Serializer.SchemaRegistry; /// diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufDeserializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufDeserializer.cs new file mode 100644 index 000000000..51f628ccb --- /dev/null +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufDeserializer.cs @@ -0,0 +1,59 @@ +namespace KafkaFlow.Serializer.SchemaRegistry +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Threading.Tasks; + using Confluent.SchemaRegistry; + using Confluent.SchemaRegistry.Serdes; + + /// + /// A protobuf message serializer integrated with the confluent schema registry + /// + public class ConfluentProtobufDeserializer : IDeserializer + { + private readonly ISchemaRegistryClient schemaRegistryClient; + private readonly ProtobufSerializerConfig serializerConfig; + + /// + /// Initializes a new instance of the class. + /// + /// An instance of + /// An instance of + public ConfluentProtobufDeserializer(IDependencyResolver resolver, ProtobufSerializerConfig serializerConfig = null) + { + this.schemaRegistryClient = + resolver.Resolve() ?? + throw new InvalidOperationException( + $"No schema registry configuration was found. Set it using {nameof(ClusterConfigurationBuilderExtensions.WithSchemaRegistry)} on cluster configuration"); + + this.serializerConfig = serializerConfig; + } + + /// + public Task SerializeAsync(object message, Stream output, ISerializerContext context) + { + return ConfluentSerializerWrapper + .GetOrCreateSerializer( + message.GetType(), + () => Activator.CreateInstance( + typeof(ProtobufSerializer<>).MakeGenericType(message.GetType()), + this.schemaRegistryClient, + this.serializerConfig)) + .SerializeAsync(message, output, context); + } + + /// + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) + { + return ConfluentDeserializerWrapper + .GetOrCreateDeserializer( + type, + () => Activator + .CreateInstance( + typeof(ProtobufDeserializer<>).MakeGenericType(type), + (IEnumerable>)null)) + .DeserializeAsync(input, context); + } + } +} diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs index 5285501cd..d29699d71 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs @@ -1,7 +1,6 @@ namespace KafkaFlow.Serializer.SchemaRegistry { using System; - using System.Collections.Generic; using System.IO; using System.Threading.Tasks; using Confluent.SchemaRegistry; @@ -42,18 +41,5 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con this.serializerConfig)) .SerializeAsync(message, output, context); } - - /// - public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) - { - return ConfluentDeserializerWrapper - .GetOrCreateDeserializer( - type, - () => Activator - .CreateInstance( - typeof(ProtobufDeserializer<>).MakeGenericType(type), - (IEnumerable>)null)) - .DeserializeAsync(input, context); - } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs index 2c0f88223..69fed7c13 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs @@ -2,7 +2,7 @@ { using Confluent.SchemaRegistry; using KafkaFlow.Configuration; - using KafkaFlow.Middlewares.Serializer; + using KafkaFlow.Serializer.Middlewares; using KafkaFlow.Serializer.SchemaRegistry; /// @@ -15,12 +15,12 @@ public static class ConsumerConfigurationBuilderExtensions /// /// The middleware configuration builder /// - public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryProtobufSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryProtobufDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares) { return middlewares.Add( resolver => new SerializerConsumerMiddleware( - new ConfluentProtobufSerializer(resolver), + new ConfluentProtobufDeserializer(resolver), new SchemaRegistryTypeResolver(new ConfluentProtobufTypeNameResolver(resolver.Resolve())))); } } diff --git a/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs index 04417ccad..a3b255b9a 100644 --- a/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs @@ -3,7 +3,8 @@ namespace KafkaFlow.UnitTests.Compressors using System; using System.Threading.Tasks; using FluentAssertions; - using KafkaFlow.Middlewares.Compression; + using KafkaFlow.Compressor; + using KafkaFlow.Compressor.Middlewares; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -11,16 +12,16 @@ namespace KafkaFlow.UnitTests.Compressors public class CompressorConsumerMiddlewareTests { private Mock contextMock; - private Mock compressorMock; + private Mock decompressorMock; private bool nextCalled; - private CompressorConsumerMiddleware target; + private DecompressorConsumerMiddleware target; [TestInitialize] public void Setup() { this.contextMock = new Mock(); - this.compressorMock = new Mock(); - this.target = new CompressorConsumerMiddleware(this.compressorMock.Object); + this.decompressorMock = new Mock(); + this.target = new DecompressorConsumerMiddleware(this.decompressorMock.Object); } [TestMethod] @@ -38,7 +39,7 @@ public void Invoke_NotByteArrayMessage_ThrowsInvalidOperationException() act.Should().Throw(); this.nextCalled.Should().BeFalse(); this.contextMock.Verify(x => x.SetMessage(It.IsAny(), It.IsAny()), Times.Never); - this.compressorMock.Verify(x => x.Decompress(It.IsAny()), Times.Never); + this.decompressorMock.Verify(x => x.Decompress(It.IsAny()), Times.Never); } [TestMethod] @@ -55,7 +56,7 @@ public async Task Invoke_ValidMessage_CallNext() .SetupGet(x => x.Message) .Returns(compressedMessage); - this.compressorMock + this.decompressorMock .Setup(x => x.Decompress((byte[]) compressedMessage.Value)) .Returns(uncompressedValue); @@ -76,7 +77,7 @@ await this.target.Invoke( resultContext.Should().NotBeNull(); resultContext.Should().Be(transformedContextMock.Object); this.contextMock.VerifyAll(); - this.compressorMock.VerifyAll(); + this.decompressorMock.VerifyAll(); } private Task SetNextCalled() diff --git a/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs index df1de6aca..2a427e65d 100644 --- a/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs @@ -3,7 +3,8 @@ namespace KafkaFlow.UnitTests.Compressors using System; using System.Threading.Tasks; using FluentAssertions; - using KafkaFlow.Middlewares.Compression; + using KafkaFlow.Compressor; + using KafkaFlow.Compressor.Middlewares; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; diff --git a/src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonSerializerTests.cs b/src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonSerializerTests.cs index 3e3a5997e..93673ed05 100644 --- a/src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonSerializerTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonSerializerTests.cs @@ -2,14 +2,12 @@ namespace KafkaFlow.UnitTests.Serializers { using System; using System.IO; - using System.Text; using System.Threading.Tasks; using AutoFixture; using FluentAssertions; using KafkaFlow.Serializer; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; - using Newtonsoft.Json; [TestClass] public class NewtonsoftJsonSerializerTests @@ -34,21 +32,6 @@ public async Task SerializeAsync_ValidPayload_JsonByteArrayGenerated() output.Position.Should().BeGreaterThan(0); } - [TestMethod] - public async Task DeserializeAsync_ValidPayload_ObjectGenerated() - { - // Arrange - var message = this.fixture.Create(); - using var input = new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message))); - - // Act - var result = await this.serializer.DeserializeAsync(input, typeof(TestMessage), this.contextMock.Object); - - // Assert - result.Should().NotBeNull(); - result.Should().BeOfType(); - } - private class TestMessage { public int IntegerField { get; set; } diff --git a/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs index 9d03646ce..0b99c7622 100644 --- a/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs @@ -4,8 +4,9 @@ namespace KafkaFlow.UnitTests.Serializers using System.IO; using System.Threading.Tasks; using FluentAssertions; - using KafkaFlow.Middlewares.Serializer; - using KafkaFlow.Middlewares.Serializer.Resolvers; + using KafkaFlow.Serializer; + using KafkaFlow.Serializer.Middlewares; + using KafkaFlow.Serializer.Resolvers; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -13,7 +14,7 @@ namespace KafkaFlow.UnitTests.Serializers public class SerializerConsumerMiddlewareTests { private Mock contextMock; - private Mock serializerMock; + private Mock deserializerMock; private Mock typeResolverMock; private bool nextCalled; @@ -24,11 +25,11 @@ public class SerializerConsumerMiddlewareTests public void Setup() { this.contextMock = new Mock(); - this.serializerMock = new Mock(); + this.deserializerMock = new Mock(); this.typeResolverMock = new Mock(); this.target = new SerializerConsumerMiddleware( - this.serializerMock.Object, + this.deserializerMock.Object, this.typeResolverMock.Object); } @@ -51,7 +52,7 @@ public async Task Invoke_NullMessageType_ReturnWithoutCallingNext() this.nextCalled.Should().BeFalse(); this.typeResolverMock.VerifyAll(); this.contextMock.Verify(x => x.SetMessage(It.IsAny(), It.IsAny()), Times.Never); - this.serializerMock.Verify( + this.deserializerMock.Verify( x => x.DeserializeAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); } @@ -69,7 +70,7 @@ public async Task Invoke_NullMessage_CallNext() // Assert this.nextCalled.Should().BeTrue(); - this.serializerMock.Verify( + this.deserializerMock.Verify( x => x.DeserializeAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); this.typeResolverMock.Verify(x => x.OnConsume(It.IsAny()), Times.Never); @@ -90,7 +91,7 @@ public void Invoke_NotByteArrayMessage_ThrowsInvalidOperationException() act.Should().Throw(); this.nextCalled.Should().BeFalse(); this.contextMock.Verify(x => x.SetMessage(It.IsAny(), It.IsAny()), Times.Never); - this.serializerMock.Verify( + this.deserializerMock.Verify( x => x.DeserializeAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); this.typeResolverMock.Verify(x => x.OnConsume(It.IsAny()), Times.Never); @@ -124,7 +125,7 @@ public async Task Invoke_ValidMessage_Deserialize() .Setup(x => x.OnConsume(this.contextMock.Object)) .Returns(messageType); - this.serializerMock + this.deserializerMock .Setup(x => x.DeserializeAsync(It.IsAny(), messageType, It.IsAny())) .ReturnsAsync(deserializedMessage); @@ -145,7 +146,7 @@ await this.target.Invoke( resultContext.Should().NotBeNull(); resultContext.Should().Be(transformedContextMock.Object); this.contextMock.VerifyAll(); - this.serializerMock.VerifyAll(); + this.deserializerMock.VerifyAll(); this.typeResolverMock.VerifyAll(); } diff --git a/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs index 9e46816df..0020facd9 100644 --- a/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs @@ -6,7 +6,8 @@ namespace KafkaFlow.UnitTests.Serializers using AutoFixture; using FluentAssertions; using KafkaFlow.Middlewares.Serializer; - using KafkaFlow.Middlewares.Serializer.Resolvers; + using KafkaFlow.Serializer; + using KafkaFlow.Serializer.Resolvers; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; diff --git a/src/KafkaFlow.Abstractions/IMessageCompressor.cs b/src/KafkaFlow/Compressor/IMessageCompressor.cs similarity index 58% rename from src/KafkaFlow.Abstractions/IMessageCompressor.cs rename to src/KafkaFlow/Compressor/IMessageCompressor.cs index 53fffb40f..8d135c9f7 100644 --- a/src/KafkaFlow.Abstractions/IMessageCompressor.cs +++ b/src/KafkaFlow/Compressor/IMessageCompressor.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow +namespace KafkaFlow.Compressor { /// /// Used to create a message compressor @@ -11,12 +11,5 @@ public interface IMessageCompressor /// The message to be compressed /// The compressed message byte[] Compress(byte[] message); - - /// - /// Decompress the given message - /// - /// The message to be decompressed - /// The decompressed message - byte[] Decompress(byte[] message); } } diff --git a/src/KafkaFlow/Compressor/IMessageDecompressor.cs b/src/KafkaFlow/Compressor/IMessageDecompressor.cs new file mode 100644 index 000000000..e18b41755 --- /dev/null +++ b/src/KafkaFlow/Compressor/IMessageDecompressor.cs @@ -0,0 +1,15 @@ +namespace KafkaFlow.Compressor +{ + /// + /// Used to create a message decompressor + /// + public interface IMessageDecompressor + { + /// + /// Decompress the given message + /// + /// The message to be decompressed + /// The decompressed message + byte[] Decompress(byte[] message); + } +} diff --git a/src/KafkaFlow/Middlewares/Compression/CompressorConsumerMiddleware.cs b/src/KafkaFlow/Compressor/Middlewares/CompressorConsumerMiddleware.cs similarity index 64% rename from src/KafkaFlow/Middlewares/Compression/CompressorConsumerMiddleware.cs rename to src/KafkaFlow/Compressor/Middlewares/CompressorConsumerMiddleware.cs index 98cc49d55..87103e672 100644 --- a/src/KafkaFlow/Middlewares/Compression/CompressorConsumerMiddleware.cs +++ b/src/KafkaFlow/Compressor/Middlewares/CompressorConsumerMiddleware.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Middlewares.Compression +namespace KafkaFlow.Compressor.Middlewares { using System; using System.Threading.Tasks; @@ -6,17 +6,17 @@ /// /// Middleware to decompress the messages when consuming /// - public class CompressorConsumerMiddleware : IMessageMiddleware + public class DecompressorConsumerMiddleware : IMessageMiddleware { - private readonly IMessageCompressor compressor; + private readonly IMessageDecompressor decompressor; /// /// Initializes a new instance of the class. /// - /// Instance of - public CompressorConsumerMiddleware(IMessageCompressor compressor) + /// Instance of + public DecompressorConsumerMiddleware(IMessageDecompressor decompressor) { - this.compressor = compressor; + this.decompressor = decompressor; } /// @@ -28,7 +28,7 @@ public Task Invoke(IMessageContext context, MiddlewareDelegate next) $"{nameof(context.Message.Value)} must be a byte array to be decompressed and it is '{context.Message.Value.GetType().FullName}'"); } - var data = this.compressor.Decompress(rawData); + var data = this.decompressor.Decompress(rawData); return next(context.SetMessage(context.Message.Key, data)); } diff --git a/src/KafkaFlow/Middlewares/Compression/CompressorProducerMiddleware.cs b/src/KafkaFlow/Compressor/Middlewares/CompressorProducerMiddleware.cs similarity index 96% rename from src/KafkaFlow/Middlewares/Compression/CompressorProducerMiddleware.cs rename to src/KafkaFlow/Compressor/Middlewares/CompressorProducerMiddleware.cs index 3d82e755e..b2b49d0b3 100644 --- a/src/KafkaFlow/Middlewares/Compression/CompressorProducerMiddleware.cs +++ b/src/KafkaFlow/Compressor/Middlewares/CompressorProducerMiddleware.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Middlewares.Compression +namespace KafkaFlow.Compressor.Middlewares { using System; using System.Threading.Tasks; diff --git a/src/KafkaFlow/Extensions/ConfigurationBuilderExtensions.cs b/src/KafkaFlow/Extensions/ConfigurationBuilderExtensions.cs index 2662ed86a..4f3768c66 100644 --- a/src/KafkaFlow/Extensions/ConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow/Extensions/ConfigurationBuilderExtensions.cs @@ -3,8 +3,9 @@ namespace KafkaFlow using System; using System.Collections.Generic; using Confluent.Kafka; + using KafkaFlow.Compressor; + using KafkaFlow.Compressor.Middlewares; using KafkaFlow.Configuration; - using KafkaFlow.Middlewares.Compression; using KafkaFlow.Middlewares.TypedHandler; using KafkaFlow.Middlewares.TypedHandler.Configuration; @@ -154,11 +155,11 @@ public static IConsumerMiddlewareConfigurationBuilder AddTypedHandlers( /// The compressor type /// [Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")] - public static IConsumerMiddlewareConfigurationBuilder AddCompressor(this IConsumerMiddlewareConfigurationBuilder middlewares) - where T : class, IMessageCompressor + public static IConsumerMiddlewareConfigurationBuilder AddDecompressor(this IConsumerMiddlewareConfigurationBuilder middlewares) + where T : class, IMessageDecompressor { middlewares.DependencyConfigurator.AddTransient(); - return middlewares.AddCompressor(resolver => resolver.Resolve()); + return middlewares.AddDecompressor(resolver => resolver.Resolve()); } /// @@ -166,15 +167,15 @@ public static IConsumerMiddlewareConfigurationBuilder AddCompressor(this ICon /// /// The middleware configuration builder /// The compressor type that implements - /// A factory to create the instance + /// A factory to create the instance /// [Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")] - public static IConsumerMiddlewareConfigurationBuilder AddCompressor( + public static IConsumerMiddlewareConfigurationBuilder AddDecompressor( this IConsumerMiddlewareConfigurationBuilder middlewares, Factory factory) - where T : class, IMessageCompressor + where T : class, IMessageDecompressor { - return middlewares.Add(resolver => new CompressorConsumerMiddleware(factory(resolver))); + return middlewares.Add(resolver => new DecompressorConsumerMiddleware(factory(resolver))); } /// diff --git a/src/KafkaFlow/Middlewares/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs b/src/KafkaFlow/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs similarity index 63% rename from src/KafkaFlow/Middlewares/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs rename to src/KafkaFlow/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs index 05eb1c524..0000a6642 100644 --- a/src/KafkaFlow/Middlewares/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs +++ b/src/KafkaFlow/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs @@ -1,8 +1,9 @@ -namespace KafkaFlow.Middlewares.Serializer.Configuration +namespace KafkaFlow.Serializer.Configuration { using System; using KafkaFlow.Configuration; - using KafkaFlow.Middlewares.Serializer.Resolvers; + using KafkaFlow.Serializer.Middlewares; + using KafkaFlow.Serializer.Resolvers; /// /// No needed @@ -13,19 +14,19 @@ public static class ConsumerMiddlewareConfigurationBuilder /// Registers a middleware to deserialize messages /// /// The middleware configuration builder - /// A class that implements + /// A class that implements /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares) - where TSerializer : class, ISerializer + where TDeserializer : class, IDeserializer where TResolver : class, IMessageTypeResolver { middlewares.DependencyConfigurator.AddTransient(); - middlewares.DependencyConfigurator.AddTransient(); + middlewares.DependencyConfigurator.AddTransient(); - return middlewares.AddSerializer( - resolver => resolver.Resolve(), + return middlewares.AddDeserializer( + resolver => resolver.Resolve(), resolver => resolver.Resolve()); } @@ -33,16 +34,16 @@ public static IConsumerMiddlewareConfigurationBuilder AddSerializer /// The middleware configuration builder - /// A class that implements + /// A class that implements /// A class that implements - /// A factory to create a + /// A factory to create a /// A factory to create a /// - public static IConsumerMiddlewareConfigurationBuilder AddSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares, - Factory serializerFactory, + Factory serializerFactory, Factory resolverFactory) - where TSerializer : class, ISerializer + where TDeserializer : class, IDeserializer where TResolver : class, IMessageTypeResolver { return middlewares.Add( @@ -55,32 +56,32 @@ public static IConsumerMiddlewareConfigurationBuilder AddSerializer /// The middleware configuration builder - /// A class that implements + /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares) - where TSerializer : class, ISerializer + where TDeserializer : class, IDeserializer { - middlewares.DependencyConfigurator.AddTransient(); + middlewares.DependencyConfigurator.AddTransient(); - return middlewares.AddSerializer( - resolver => resolver.Resolve(), + return middlewares.AddDeserializer( + resolver => resolver.Resolve(), _ => new DefaultTypeResolver()); } /// /// Register a middleware to deserialize messages /// - /// A class that implements + /// A class that implements /// The middleware configuration builder - /// A factory to create a + /// A factory to create a /// - public static IConsumerMiddlewareConfigurationBuilder AddSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares, - Factory serializerFactory) - where TSerializer : class, ISerializer + Factory serializerFactory) + where TDeserializer : class, IDeserializer { - return middlewares.AddSerializer( + return middlewares.AddDeserializer( serializerFactory, _ => new DefaultTypeResolver()); } @@ -89,16 +90,16 @@ public static IConsumerMiddlewareConfigurationBuilder AddSerializer /// Register a middleware to deserialize the message to a fixed type /// /// The middleware configuration builder - /// A factory to create a + /// A factory to create a /// The message type - /// A class that implements + /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( this IConsumerMiddlewareConfigurationBuilder middlewares, - Factory serializerFactory) - where TSerializer : class, ISerializer + Factory serializerFactory) + where TDeserializer : class, IDeserializer { - return middlewares.AddSerializer( + return middlewares.AddDeserializer( serializerFactory, _ => new SingleMessageTypeResolver(typeof(TMessage))); } @@ -108,13 +109,13 @@ public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer /// The middleware configuration builder /// The message type - /// A class that implements + /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares) - where TSerializer : class, ISerializer + where TDeserializer : class, IDeserializer { - return middlewares.AddSingleTypeSerializer(typeof(TMessage)); + return middlewares.AddSingleTypeSerializer(typeof(TMessage)); } /// @@ -122,17 +123,17 @@ public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer /// The middleware configuration builder /// The message type - /// A class that implements + /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( this IConsumerMiddlewareConfigurationBuilder middlewares, Type messageType) - where TSerializer : class, ISerializer + where TDeserializer : class, IDeserializer { - middlewares.DependencyConfigurator.AddTransient(); + middlewares.DependencyConfigurator.AddTransient(); - return middlewares.AddSerializer( - resolver => resolver.Resolve(), + return middlewares.AddDeserializer( + resolver => resolver.Resolve(), _ => new SingleMessageTypeResolver(messageType)); } @@ -140,17 +141,17 @@ public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer /// The middleware configuration builder - /// A factory to create a + /// A factory to create a /// The message type - /// A class that implements + /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( this IConsumerMiddlewareConfigurationBuilder middlewares, - Factory serializerFactory, + Factory serializerFactory, Type messageType) - where TSerializer : class, ISerializer + where TDeserializer : class, IDeserializer { - return middlewares.AddSerializer( + return middlewares.AddDeserializer( serializerFactory, _ => new SingleMessageTypeResolver(messageType)); } diff --git a/src/KafkaFlow/Middlewares/Serializer/Configuration/ProducerMiddlewareConfigurationBuilder.cs b/src/KafkaFlow/Serializer/Configuration/ProducerMiddlewareConfigurationBuilder.cs similarity index 98% rename from src/KafkaFlow/Middlewares/Serializer/Configuration/ProducerMiddlewareConfigurationBuilder.cs rename to src/KafkaFlow/Serializer/Configuration/ProducerMiddlewareConfigurationBuilder.cs index e8b9f8f32..08179a1ee 100644 --- a/src/KafkaFlow/Middlewares/Serializer/Configuration/ProducerMiddlewareConfigurationBuilder.cs +++ b/src/KafkaFlow/Serializer/Configuration/ProducerMiddlewareConfigurationBuilder.cs @@ -1,8 +1,9 @@ -namespace KafkaFlow.Middlewares.Serializer.Configuration +namespace KafkaFlow.Serializer.Configuration { using System; using KafkaFlow.Configuration; - using KafkaFlow.Middlewares.Serializer.Resolvers; + using KafkaFlow.Middlewares.Serializer; + using KafkaFlow.Serializer.Resolvers; /// /// No needed diff --git a/src/KafkaFlow/Serializer/IDeserializer.cs b/src/KafkaFlow/Serializer/IDeserializer.cs new file mode 100644 index 000000000..5bbb3dac9 --- /dev/null +++ b/src/KafkaFlow/Serializer/IDeserializer.cs @@ -0,0 +1,21 @@ +namespace KafkaFlow.Serializer +{ + using System; + using System.IO; + using System.Threading.Tasks; + + /// + /// Used to implement a message serializer + /// + public interface IDeserializer + { + /// + /// Deserializes the given message + /// + /// A stream to read the data to be deserialized + /// The type to be created + /// An object containing metadata + /// The deserialized message + Task DeserializeAsync(Stream input, Type type, ISerializerContext context); + } +} diff --git a/src/KafkaFlow.Abstractions/ISerializer.cs b/src/KafkaFlow/Serializer/ISerializer.cs similarity index 56% rename from src/KafkaFlow.Abstractions/ISerializer.cs rename to src/KafkaFlow/Serializer/ISerializer.cs index 0b0951997..7db01b64a 100644 --- a/src/KafkaFlow.Abstractions/ISerializer.cs +++ b/src/KafkaFlow/Serializer/ISerializer.cs @@ -1,6 +1,5 @@ -namespace KafkaFlow +namespace KafkaFlow.Serializer { - using System; using System.IO; using System.Threading.Tasks; @@ -17,14 +16,5 @@ public interface ISerializer /// An object containing metadata /// The serialized message Task SerializeAsync(object message, Stream output, ISerializerContext context); - - /// - /// Deserializes the given message - /// - /// A stream to read the data to be deserialized - /// The type to be created - /// An object containing metadata - /// The deserialized message - Task DeserializeAsync(Stream input, Type type, ISerializerContext context); } } diff --git a/src/KafkaFlow/Middlewares/Serializer/SerializerConsumerMiddleware.cs b/src/KafkaFlow/Serializer/Middlewares/SerializerConsumerMiddleware.cs similarity index 82% rename from src/KafkaFlow/Middlewares/Serializer/SerializerConsumerMiddleware.cs rename to src/KafkaFlow/Serializer/Middlewares/SerializerConsumerMiddleware.cs index 3bec7a8f4..8dcf96310 100644 --- a/src/KafkaFlow/Middlewares/Serializer/SerializerConsumerMiddleware.cs +++ b/src/KafkaFlow/Serializer/Middlewares/SerializerConsumerMiddleware.cs @@ -1,41 +1,41 @@ -namespace KafkaFlow.Middlewares.Serializer +namespace KafkaFlow.Serializer.Middlewares { using System; using System.IO; using System.Threading.Tasks; - using KafkaFlow.Middlewares.Serializer.Resolvers; + using KafkaFlow.Serializer.Resolvers; /// /// Middleware to deserialize messages when consuming /// public class SerializerConsumerMiddleware : IMessageMiddleware { - private readonly ISerializer serializer; + private readonly IDeserializer deserializer; private readonly IAsyncMessageTypeResolver typeResolver; /// /// Initializes a new instance of the class. /// - /// Instance of + /// Instance of /// Instance of public SerializerConsumerMiddleware( - ISerializer serializer, + IDeserializer deserializer, IMessageTypeResolver typeResolver) - : this(serializer, new AsyncMessageTypeResolverWrapper(typeResolver)) + : this(deserializer, new AsyncMessageTypeResolverWrapper(typeResolver)) { } /// /// Initializes a new instance of the class. /// - /// Instance of + /// Instance of /// Instance of public SerializerConsumerMiddleware( - ISerializer serializer, + IDeserializer deserializer, IAsyncMessageTypeResolver typeResolver) { - this.serializer = serializer; + this.deserializer = deserializer; this.typeResolver = typeResolver; } @@ -75,7 +75,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next) using var stream = new MemoryStream(rawData); - var data = await this.serializer + var data = await this.deserializer .DeserializeAsync( stream, messageType, diff --git a/src/KafkaFlow/Middlewares/Serializer/SerializerProducerMiddleware.cs b/src/KafkaFlow/Serializer/Middlewares/SerializerProducerMiddleware.cs similarity index 97% rename from src/KafkaFlow/Middlewares/Serializer/SerializerProducerMiddleware.cs rename to src/KafkaFlow/Serializer/Middlewares/SerializerProducerMiddleware.cs index 1638e18d4..627e0b51a 100644 --- a/src/KafkaFlow/Middlewares/Serializer/SerializerProducerMiddleware.cs +++ b/src/KafkaFlow/Serializer/Middlewares/SerializerProducerMiddleware.cs @@ -1,7 +1,8 @@ namespace KafkaFlow.Middlewares.Serializer { using System.Threading.Tasks; - using KafkaFlow.Middlewares.Serializer.Resolvers; + using KafkaFlow.Serializer; + using KafkaFlow.Serializer.Resolvers; using Microsoft.IO; /// diff --git a/src/KafkaFlow/Middlewares/Serializer/Resolvers/AsyncMessageTypeResolverWrapper.cs b/src/KafkaFlow/Serializer/Resolvers/AsyncMessageTypeResolverWrapper.cs similarity index 91% rename from src/KafkaFlow/Middlewares/Serializer/Resolvers/AsyncMessageTypeResolverWrapper.cs rename to src/KafkaFlow/Serializer/Resolvers/AsyncMessageTypeResolverWrapper.cs index 78023bdbd..c3ca1a743 100644 --- a/src/KafkaFlow/Middlewares/Serializer/Resolvers/AsyncMessageTypeResolverWrapper.cs +++ b/src/KafkaFlow/Serializer/Resolvers/AsyncMessageTypeResolverWrapper.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Middlewares.Serializer.Resolvers +namespace KafkaFlow.Serializer.Resolvers { using System; using System.Threading.Tasks; diff --git a/src/KafkaFlow/Middlewares/Serializer/Resolvers/DefaultTypeResolver.cs b/src/KafkaFlow/Serializer/Resolvers/DefaultTypeResolver.cs similarity index 93% rename from src/KafkaFlow/Middlewares/Serializer/Resolvers/DefaultTypeResolver.cs rename to src/KafkaFlow/Serializer/Resolvers/DefaultTypeResolver.cs index 6802f7d3d..9f68d9839 100644 --- a/src/KafkaFlow/Middlewares/Serializer/Resolvers/DefaultTypeResolver.cs +++ b/src/KafkaFlow/Serializer/Resolvers/DefaultTypeResolver.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Middlewares.Serializer.Resolvers +namespace KafkaFlow.Serializer.Resolvers { using System; diff --git a/src/KafkaFlow/Middlewares/Serializer/Resolvers/IAsyncMessageTypeResolver.cs b/src/KafkaFlow/Serializer/Resolvers/IAsyncMessageTypeResolver.cs similarity index 93% rename from src/KafkaFlow/Middlewares/Serializer/Resolvers/IAsyncMessageTypeResolver.cs rename to src/KafkaFlow/Serializer/Resolvers/IAsyncMessageTypeResolver.cs index 52ee2fa88..8f7d89db3 100644 --- a/src/KafkaFlow/Middlewares/Serializer/Resolvers/IAsyncMessageTypeResolver.cs +++ b/src/KafkaFlow/Serializer/Resolvers/IAsyncMessageTypeResolver.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Middlewares.Serializer.Resolvers +namespace KafkaFlow.Serializer.Resolvers { using System; using System.Threading.Tasks; diff --git a/src/KafkaFlow/Middlewares/Serializer/Resolvers/IMessageTypeResolver.cs b/src/KafkaFlow/Serializer/Resolvers/IMessageTypeResolver.cs similarity index 93% rename from src/KafkaFlow/Middlewares/Serializer/Resolvers/IMessageTypeResolver.cs rename to src/KafkaFlow/Serializer/Resolvers/IMessageTypeResolver.cs index 77cc3e89e..f40d7a749 100644 --- a/src/KafkaFlow/Middlewares/Serializer/Resolvers/IMessageTypeResolver.cs +++ b/src/KafkaFlow/Serializer/Resolvers/IMessageTypeResolver.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Middlewares.Serializer.Resolvers +namespace KafkaFlow.Serializer.Resolvers { using System; diff --git a/src/KafkaFlow/Middlewares/Serializer/Resolvers/SingleMessageTypeResolver.cs b/src/KafkaFlow/Serializer/Resolvers/SingleMessageTypeResolver.cs similarity index 94% rename from src/KafkaFlow/Middlewares/Serializer/Resolvers/SingleMessageTypeResolver.cs rename to src/KafkaFlow/Serializer/Resolvers/SingleMessageTypeResolver.cs index 59ec131dc..e771f9d79 100644 --- a/src/KafkaFlow/Middlewares/Serializer/Resolvers/SingleMessageTypeResolver.cs +++ b/src/KafkaFlow/Serializer/Resolvers/SingleMessageTypeResolver.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Middlewares.Serializer.Resolvers +namespace KafkaFlow.Serializer.Resolvers { using System;