Skip to content

Commit

Permalink
refactor: update batching extension method to AddBatching
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-sousa-8 committed Sep 20, 2023
1 parent ce7b5d7 commit 5750d76
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using KafkaFlow.Middlewares.BatchConsume;
using KafkaFlow.Batching;

namespace KafkaFlow.Sample.BatchOperations;

Expand Down
4 changes: 2 additions & 2 deletions samples/KafkaFlow.Sample.BatchOperations/Program.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System;
using System.Linq;
using KafkaFlow;
using KafkaFlow.Middlewares.BatchConsume;
using KafkaFlow.Batching;
using KafkaFlow.Producers;
using KafkaFlow.Sample.BatchOperations;
using KafkaFlow.Serializer;
Expand Down Expand Up @@ -37,7 +37,7 @@
.AddMiddlewares(
middlewares => middlewares
.AddDeserializer<JsonCoreDeserializer>()
.BatchConsume(10, TimeSpan.FromSeconds(10))
.AddBatching(10, TimeSpan.FromSeconds(10))
.Add<PrintConsoleMiddleware>()
)
)
Expand Down
2 changes: 1 addition & 1 deletion samples/KafkaFlow.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
.WithWorkersCount(3)
.AddMiddlewares(
middlewares => middlewares
.AddDeserializer<ProtobufNetSerializer>()
.AddDeserializer<ProtobufNetDeserializer>()
.AddTypedHandlers(h => h.AddHandler<PrintConsoleHandler>())
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static IClusterConfigurationBuilder EnableAdminMessages(
.DisableManagement()
.AddMiddlewares(
middlewares => middlewares
.AddDeserializer<ProtobufNetSerializer>()
.AddDeserializer<ProtobufNetDeserializer>()
.AddTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
Expand Down Expand Up @@ -123,7 +123,7 @@ public static IClusterConfigurationBuilder EnableTelemetry(
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddDeserializer<ProtobufNetSerializer>()
.AddDeserializer<ProtobufNetDeserializer>()
.AddTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
Expand Down
6 changes: 3 additions & 3 deletions src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddSingleTypeSerializer<ProtobufNetSerializer>(typeof(TestMessage1))
.AddSingleTypeDeserializer<ProtobufNetDeserializer>(typeof(TestMessage1))
.AddTypedHandlers(
handlers =>
handlers
Expand All @@ -229,7 +229,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
})
.AddMiddlewares(
middlewares => middlewares
.AddSingleTypeDeserializer<PauseResumeMessage, ProtobufNetSerializer>()
.AddSingleTypeDeserializer<PauseResumeMessage, ProtobufNetDeserializer>()
.AddTypedHandlers(
handlers =>
handlers
Expand Down Expand Up @@ -287,7 +287,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddMiddlewares(
middlewares => middlewares
.AddDecompressor<GzipMessageDecompressor>()
.AddDeserializer<ProtobufNetSerializer>()
.AddDeserializer<ProtobufNetDeserializer>()
.AddTypedHandlers(
handlers =>
handlers
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
namespace KafkaFlow.Serializer
{
using System;
using System.IO;
using System.Threading.Tasks;
using ProtoBuf;

/// <summary>
/// A message serializer using protobuf-net library
/// </summary>
public class ProtobufNetSerializer : ISerializer, IDeserializer
public class ProtobufNetSerializer : ISerializer
{
/// <inheritdoc/>
public Task SerializeAsync(object message, Stream output, ISerializerContext context)
Expand All @@ -17,11 +16,5 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con

return Task.CompletedTask;
}

/// <inheritdoc/>
public Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context)
{
return Task.FromResult(Serializer.Deserialize(type, input));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ namespace KafkaFlow.UnitTests.BatchConsume
using System;
using System.Threading.Tasks;
using FluentAssertions;
using KafkaFlow.Middlewares.BatchConsume;
using KafkaFlow.Batching;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
namespace KafkaFlow.UnitTests.Serializers
{
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using AutoFixture;
using FluentAssertions;
using KafkaFlow.Serializer;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Newtonsoft.Json;

[TestClass]
public class NewtonsoftJsonDeserializerTests
{
private readonly Mock<ISerializerContext> contextMock = new ();
private readonly NewtonsoftJsonDeserializer deserializer = new ();

private readonly Fixture fixture = new();

[TestMethod]
public async Task DeserializeAsync_ValidPayload_ObjectGenerated()
{
// Arrange
var message = this.fixture.Create<TestMessage>();
using var input = new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)));

// Act
var result = await this.deserializer.DeserializeAsync(input, typeof(TestMessage), this.contextMock.Object);

// Assert
result.Should().NotBeNull();
result.Should().BeOfType<TestMessage>();
}

private class TestMessage
{
public int IntegerField { get; set; }

public string StringField { get; set; }

public double DoubleField { get; set; }

public DateTime DateTimeField { get; set; }
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Middlewares.BatchConsume
namespace KafkaFlow.Batching
{
using System;
using System.Collections.Generic;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Middlewares.BatchConsume
namespace KafkaFlow.Batching
{
using System;
using System.Collections.Generic;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Middlewares.BatchConsume
namespace KafkaFlow.Batching
{
using System;
using System.Collections.Generic;
Expand All @@ -7,7 +7,7 @@ namespace KafkaFlow.Middlewares.BatchConsume
/// <summary>
/// no needed
/// </summary>
public static class BatchConsumeExtensions
public static class BatchingExtensions
{
/// <summary>
/// Accumulates a group of messages to be passed as a batch to the next middleware as just one message
Expand All @@ -16,7 +16,7 @@ public static class BatchConsumeExtensions
/// <param name="batchSize">The maximum size of the batch, when this limit is reached the next middleware will be called</param>
/// <param name="batchTimeout">The maximum time the middleware will wait to call the next middleware</param>
/// <returns></returns>
public static IConsumerMiddlewareConfigurationBuilder BatchConsume(
public static IConsumerMiddlewareConfigurationBuilder AddBatching(
this IConsumerMiddlewareConfigurationBuilder builder,
int batchSize,
TimeSpan batchTimeout)
Expand All @@ -30,7 +30,7 @@ public static IConsumerMiddlewareConfigurationBuilder BatchConsume(
}

/// <summary>
/// Gets the accumulated <see cref="IMessageContext"/> grouped by BatchConsume middleware
/// Gets the accumulated <see cref="IMessageContext"/> grouped by batching middleware
/// </summary>
/// <param name="context">The message context</param>
/// <returns>All the contexts in the batch</returns>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static IConsumerMiddlewareConfigurationBuilder AddDeserializer<TDeseriali
/// <typeparam name="TMessage">The message type</typeparam>
/// <typeparam name="TDeserializer">A class that implements <see cref="IDeserializer"/></typeparam>
/// <returns></returns>
public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer<TMessage, TDeserializer>(
public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer<TMessage, TDeserializer>(
this IConsumerMiddlewareConfigurationBuilder middlewares,
Factory<TDeserializer> serializerFactory)
where TDeserializer : class, IDeserializer
Expand All @@ -115,7 +115,7 @@ public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer<
this IConsumerMiddlewareConfigurationBuilder middlewares)
where TDeserializer : class, IDeserializer
{
return middlewares.AddSingleTypeSerializer<TDeserializer>(typeof(TMessage));
return middlewares.AddSingleTypeDeserializer<TDeserializer>(typeof(TMessage));
}

/// <summary>
Expand All @@ -125,7 +125,7 @@ public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer<
/// <param name="messageType">The message type</param>
/// <typeparam name="TDeserializer">A class that implements <see cref="IDeserializer"/></typeparam>
/// <returns></returns>
public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer<TDeserializer>(
public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer<TDeserializer>(
this IConsumerMiddlewareConfigurationBuilder middlewares,
Type messageType)
where TDeserializer : class, IDeserializer
Expand All @@ -145,7 +145,7 @@ public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer<TD
/// <param name="messageType">The message type</param>
/// <typeparam name="TDeserializer">A class that implements <see cref="IDeserializer"/></typeparam>
/// <returns></returns>
public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer<TDeserializer>(
public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer<TDeserializer>(
this IConsumerMiddlewareConfigurationBuilder middlewares,
Factory<TDeserializer> serializerFactory,
Type messageType)
Expand Down

0 comments on commit 5750d76

Please sign in to comment.