Skip to content

Commit

Permalink
feat!: segregated serializer and compressor interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-sousa-8 committed Sep 20, 2023
1 parent 76e9ccc commit ce7b5d7
Show file tree
Hide file tree
Showing 52 changed files with 467 additions and 260 deletions.
4 changes: 2 additions & 2 deletions samples/KafkaFlow.Sample.BatchOperations/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
using System.Linq;
using KafkaFlow;
using KafkaFlow.Middlewares.BatchConsume;
using KafkaFlow.Middlewares.Serializer.Configuration;
using KafkaFlow.Producers;
using KafkaFlow.Sample.BatchOperations;
using KafkaFlow.Serializer;
using KafkaFlow.Serializer.Configuration;
using Microsoft.Extensions.DependencyInjection;

var services = new ServiceCollection();
Expand Down Expand Up @@ -36,7 +36,7 @@
.WithWorkersCount(1)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<JsonCoreSerializer>()
.AddDeserializer<JsonCoreDeserializer>()
.BatchConsume(10, TimeSpan.FromSeconds(10))
.Add<PrintConsoleMiddleware>()
)
Expand Down
7 changes: 3 additions & 4 deletions samples/KafkaFlow.Sample.ConsumerThrottling/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ The ThrottleConsumer mechanism works by checking the lag of "consumerA" every 1
using System;
using System.Threading.Tasks;
using KafkaFlow;
using KafkaFlow.Middlewares.Serializer;
using KafkaFlow.Middlewares.Serializer.Configuration;
using KafkaFlow.Producers;
using KafkaFlow.Serializer;
using KafkaFlow.Serializer.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

Expand Down Expand Up @@ -50,7 +49,7 @@ The ThrottleConsumer mechanism works by checking the lag of "consumerA" every 1
.WithWorkersCount(1)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<JsonCoreSerializer>()
.AddDeserializer<JsonCoreDeserializer>()
.Add<ProcessMessagesMiddleware>()
)
)
Expand All @@ -70,7 +69,7 @@ The ThrottleConsumer mechanism works by checking the lag of "consumerA" every 1
.AddAction(a => a.AboveThreshold(10).ApplyDelay(1_000))
.AddAction(a => a.AboveThreshold(20).ApplyDelay(5_000))
.AddAction(a => a.AboveThreshold(30).ApplyDelay(10_000)))
.AddSerializer<JsonCoreSerializer>()
.AddDeserializer<JsonCoreDeserializer>()
.Add<ProcessMessagesMiddleware>()
)
)
Expand Down
4 changes: 2 additions & 2 deletions samples/KafkaFlow.Sample.FlowControl/Program.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using System;
using KafkaFlow;
using KafkaFlow.Middlewares.Serializer.Configuration;
using KafkaFlow.Sample.FlowControl;
using KafkaFlow.Serializer;
using KafkaFlow.Serializer.Configuration;
using Microsoft.Extensions.DependencyInjection;

var services = new ServiceCollection();
Expand Down Expand Up @@ -36,7 +36,7 @@
.WithWorkersCount(1)
.AddMiddlewares(
m => m
.AddSingleTypeSerializer<SampleMessage, NewtonsoftJsonSerializer>()
.AddSingleTypeDeserializer<SampleMessage, NewtonsoftJsonDeserializer>()
.Add<PrintConsoleMiddleware>()
)
);
Expand Down
4 changes: 2 additions & 2 deletions samples/KafkaFlow.Sample.PauseConsumerOnError/Program.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using KafkaFlow;
using KafkaFlow.Middlewares.Serializer.Configuration;
using KafkaFlow.Producers;
using KafkaFlow.Sample.PauseConsumerOnError;
using KafkaFlow.Serializer;
using KafkaFlow.Serializer.Configuration;
using Microsoft.Extensions.DependencyInjection;

var services = new ServiceCollection();
Expand Down Expand Up @@ -33,7 +33,7 @@
middlewares =>
middlewares
.Add<PauseConsumerOnExceptionMiddleware>()
.AddSerializer<JsonCoreSerializer>()
.AddDeserializer<JsonCoreDeserializer>()
.AddTypedHandlers(h => h.AddHandler<MessageHandler>())
)
)
Expand Down
4 changes: 2 additions & 2 deletions samples/KafkaFlow.Sample.SchemaRegistry/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddSchemaRegistryAvroSerializer()
.AddSchemaRegistryAvroDeserializer()
.AddTypedHandlers(
handlers => handlers
.AddHandler<AvroMessageHandler>()
Expand Down Expand Up @@ -102,7 +102,7 @@
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddSchemaRegistryProtobufSerializer()
.AddSchemaRegistryProtobufDeserializer()
.AddTypedHandlers(handlers => handlers.AddHandler<ProtobufMessageHandler>())
)
)
Expand Down
4 changes: 2 additions & 2 deletions samples/KafkaFlow.Sample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using System;
using System.Threading.Tasks;
using KafkaFlow;
using KafkaFlow.Middlewares.Serializer.Configuration;
using KafkaFlow.Producers;
using KafkaFlow.Sample;
using KafkaFlow.Serializer;
using KafkaFlow.Serializer.Configuration;
using Microsoft.Extensions.DependencyInjection;

var services = new ServiceCollection();
Expand Down Expand Up @@ -33,7 +33,7 @@
.WithWorkersCount(3)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ProtobufNetSerializer>()
.AddDeserializer<ProtobufNetSerializer>()
.AddTypedHandlers(h => h.AddHandler<PrintConsoleHandler>())
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
using KafkaFlow.Admin;
using KafkaFlow.Admin.Handlers;
using KafkaFlow.Configuration;
using KafkaFlow.Middlewares.Serializer.Configuration;
using KafkaFlow.Serializer;
using KafkaFlow.Serializer.Configuration;

/// <summary>
/// No needed
Expand Down Expand Up @@ -54,7 +54,7 @@ public static IClusterConfigurationBuilder EnableAdminMessages(
.DisableManagement()
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ProtobufNetSerializer>()
.AddDeserializer<ProtobufNetSerializer>()
.AddTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
Expand Down Expand Up @@ -123,7 +123,7 @@ public static IClusterConfigurationBuilder EnableTelemetry(
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ProtobufNetSerializer>()
.AddDeserializer<ProtobufNetSerializer>()
.AddTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
Expand Down
14 changes: 0 additions & 14 deletions src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,5 @@ public byte[] Compress(byte[] message)

return outputStream.ToArray();
}

/// <inheritdoc />
public byte[] Decompress(byte[] message)
{
using var outputStream = new MemoryStream();
using var inputStream = new MemoryStream(message);

using (var gzipStream = new GZipStream(inputStream, CompressionMode.Decompress))
{
gzipStream.CopyTo(outputStream);
}

return outputStream.ToArray();
}
}
}
25 changes: 25 additions & 0 deletions src/KafkaFlow.Compressor.Gzip/GzipMessageDecompressor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace KafkaFlow.Compressor.Gzip
{
using System.IO;
using System.IO.Compression;

/// <summary>
/// A GZIP message decompressor
/// </summary>
public class GzipMessageDecompressor : IMessageDecompressor
{
/// <inheritdoc />
public byte[] Decompress(byte[] message)
{
using var outputStream = new MemoryStream();
using var inputStream = new MemoryStream(message);

using (var gzipStream = new GZipStream(inputStream, CompressionMode.Decompress))
{
gzipStream.CopyTo(outputStream);
}

return outputStream.ToArray();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@

<ItemGroup>
<ProjectReference Include="..\KafkaFlow.Abstractions\KafkaFlow.Abstractions.csproj"/>
<ProjectReference Include="..\KafkaFlow\KafkaFlow.csproj" />
</ItemGroup>
</Project>
20 changes: 10 additions & 10 deletions src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ namespace KafkaFlow.IntegrationTests.Core
using KafkaFlow.IntegrationTests.Core.Messages;
using KafkaFlow.IntegrationTests.Core.Middlewares;
using KafkaFlow.IntegrationTests.Core.Producers;
using KafkaFlow.Middlewares.Serializer.Configuration;
using KafkaFlow.Serializer;
using KafkaFlow.Serializer.Configuration;
using KafkaFlow.Serializer.SchemaRegistry;
using AutoOffsetReset = KafkaFlow.AutoOffsetReset;

Expand Down Expand Up @@ -128,7 +128,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.WithConsumerConfig(defaultConfig)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ConfluentAvroSerializer>()
.AddDeserializer<ConfluentAvroDeserializer>()
.AddTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
Expand Down Expand Up @@ -156,7 +156,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.WithConsumerConfig(defaultConfig)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ConfluentProtobufSerializer>()
.AddDeserializer<ConfluentProtobufDeserializer>()
.AddTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
Expand Down Expand Up @@ -184,7 +184,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.WithConsumerConfig(defaultConfig)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<ConfluentJsonSerializer>()
.AddDeserializer<ConfluentJsonDeserializer>()
.AddTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
Expand Down Expand Up @@ -229,7 +229,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
})
.AddMiddlewares(
middlewares => middlewares
.AddSingleTypeSerializer<PauseResumeMessage, ProtobufNetSerializer>()
.AddSingleTypeDeserializer<PauseResumeMessage, ProtobufNetSerializer>()
.AddTypedHandlers(
handlers =>
handlers
Expand All @@ -244,7 +244,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer<JsonCoreSerializer>()
.AddDeserializer<JsonCoreDeserializer>()
.AddTypedHandlers(
handlers =>
handlers
Expand All @@ -259,7 +259,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddCompressor<GzipMessageCompressor>()
.AddDecompressor<GzipMessageDecompressor>()
.Add<GzipMiddleware>()))
.AddConsumer(
consumer => consumer
Expand All @@ -270,7 +270,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddSerializer(_ => new JsonCoreSerializer())
.AddDeserializer(_ => new JsonCoreDeserializer())
.AddTypedHandlers(
handlers =>
handlers
Expand All @@ -286,8 +286,8 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.WithAutoCommitIntervalMs(1)
.AddMiddlewares(
middlewares => middlewares
.AddCompressor<GzipMessageCompressor>()
.AddSerializer<ProtobufNetSerializer>()
.AddDecompressor<GzipMessageDecompressor>()
.AddDeserializer<ProtobufNetSerializer>()
.AddTypedHandlers(
handlers =>
handlers
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace KafkaFlow
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using KafkaFlow.Middlewares.Serializer.Resolvers;
using KafkaFlow.Serializer.Resolvers;

/// <summary>
/// The message type resolver to be used with schema registry serializers
Expand Down
40 changes: 40 additions & 0 deletions src/KafkaFlow.Serializer.JsonCore/JsonCoreDeserializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
namespace KafkaFlow.Serializer
{
using System;
using System.IO;
using System.Text.Json;
using System.Threading.Tasks;

/// <summary>
/// A message deserializer using System.Text.Json library
/// </summary>
public class JsonCoreDeserializer : IDeserializer
{
private readonly JsonSerializerOptions serializerOptions;

/// <summary>
/// Initializes a new instance of the <see cref="JsonCoreDeserializer"/> class.
/// </summary>
/// <param name="options">Json serializer options</param>
public JsonCoreDeserializer(JsonSerializerOptions options)
{
this.serializerOptions = options;
}

/// <summary>
/// Initializes a new instance of the <see cref="JsonCoreDeserializer"/> class.
/// </summary>
public JsonCoreDeserializer()
: this(new JsonSerializerOptions())
{
}

/// <inheritdoc/>
public async Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context)
{
return await JsonSerializer
.DeserializeAsync(input, type, this.serializerOptions)
.ConfigureAwait(false);
}
}
}
11 changes: 1 addition & 10 deletions src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace KafkaFlow.Serializer
{
using System;
using System.IO;
using System.Text.Json;
using System.Threading.Tasks;
Expand Down Expand Up @@ -59,13 +58,5 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con

return Task.CompletedTask;
}

/// <inheritdoc/>
public async Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context)
{
return await JsonSerializer
.DeserializeAsync(input, type, this.serializerOptions)
.ConfigureAwait(false);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@

<ItemGroup>
<ProjectReference Include="..\KafkaFlow.Abstractions\KafkaFlow.Abstractions.csproj" />
<ProjectReference Include="..\KafkaFlow\KafkaFlow.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

<ItemGroup>
<ProjectReference Include="..\KafkaFlow.Abstractions\KafkaFlow.Abstractions.csproj" />
<ProjectReference Include="..\KafkaFlow\KafkaFlow.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit ce7b5d7

Please sign in to comment.