Skip to content

Commit

Permalink
test: fix unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Sep 15, 2023
1 parent dc70d43 commit 057c6fd
Show file tree
Hide file tree
Showing 33 changed files with 310 additions and 289 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
catch (Exception exception)
{
context.ConsumerContext.AutoCompleteMessage = false;
context.ConsumerContext.AutoMessageCompletion = false;
this.logHandler.Error("Error handling message", exception,
new
{
Expand Down
18 changes: 9 additions & 9 deletions src/KafkaFlow.Abstractions/IConsumerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public interface IConsumerContext
/// <summary>
/// Gets or sets a value indicating whether if the framework should auto complete the message in the end
/// </summary>
bool AutoCompleteMessage { get; set; }
bool AutoMessageCompletion { get; set; }

/// <summary>
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
Expand All @@ -73,11 +73,18 @@ public interface IConsumerContext
/// </summary>
IDependencyResolver WorkerDependencyResolver { get; }

/// <summary>
/// Gets a Task that completes when the <see cref="Complete"/> method is invoked,
/// indicating the end of message processing. This allows async operations
/// to wait for the message to be fully processed and its offset stored.
/// </summary>
Task<TopicPartitionOffset> Completion { get; }

/// <summary>
/// Signals the completion of message processing and stores the message offset to eventually be committed.
/// After this call, the framework marks the message processing as finished and releases resources associated with the message.
/// By default, this method is automatically invoked when message processing concludes, unless
/// the consumer is configured for manual message completion or the <see cref="AutoCompleteMessage"/> flag is set to false.
/// the consumer is configured for manual message completion or the <see cref="AutoMessageCompletion"/> flag is set to false.
/// </summary>
void Complete();

Expand All @@ -96,12 +103,5 @@ public interface IConsumerContext
/// Resume Kafka's message fetch
/// </summary>
void Resume();

/// <summary>
/// Gets a Task that completes when the <see cref="Complete"/> method is invoked,
/// indicating the end of message processing. This allows async operations
/// to wait for the message to be fully processed and its offset stored.
/// </summary>
Task<TopicPartitionOffset> Completion { get; }
}
}
11 changes: 0 additions & 11 deletions src/KafkaFlow.Abstractions/IMessageContext.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace KafkaFlow
{
using System;

/// <summary>
/// A context that contains the message and metadata
/// </summary>
Expand Down Expand Up @@ -34,21 +32,12 @@ public interface IMessageContext
/// </summary>
IDependencyResolver DependencyResolver { get; }


/// <summary>
/// Creates a new <see cref="IMessageContext"/> with the new message
/// </summary>
/// <param name="key">The new message key</param>
/// <param name="value">The new message value</param>
/// <returns>A new message context containing the new values</returns>
IMessageContext SetMessage(object key, object value);

/// <summary>
/// Deprecated
/// </summary>
/// <param name="message">key</param>
/// <returns></returns>
[Obsolete("This method should no longer be used, use the " + nameof(SetMessage) + "() instead.", true)]
IMessageContext TransformMessage(object message);
}
}
7 changes: 0 additions & 7 deletions src/KafkaFlow.Abstractions/MiddlewareLifetime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@ public enum MiddlewareLifetime
/// </summary>
Singleton,

/// <summary>
/// Obsolete. Please use Message instead. Indicates a new middleware instance is instantiated for each message scope.
/// This instance will be disposed when the message scope ends.
/// </summary>
[Obsolete("Use Message lifetime instead")]
Scoped,

/// <summary>
/// Indicates a new middleware instance is instantiated for each individual message scope, ensuring isolated processing.
/// This instance will be disposed when the message scope ends.
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaFlow.Admin/MemoryTelemetryStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private void TryCleanItems()
return;
}

this.lastCleanDate = this.dateTimeProvider.Now;
this.lastCleanDate = this.dateTimeProvider.UtcNow;

this.CleanExpiredItems();
}
Expand All @@ -70,6 +70,6 @@ private void CleanExpiredItems()
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool NeedsCleaning() => this.dateTimeProvider.Now - this.lastCleanDate > this.cleanRunInterval;
private bool NeedsCleaning() => this.dateTimeProvider.UtcNow - this.lastCleanDate > this.cleanRunInterval;
}
}
4 changes: 1 addition & 3 deletions src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ public BatchConsumeMessageContext(
public IDependencyResolver DependencyResolver => this.batchDependencyScope.Resolver;

public IMessageContext SetMessage(object key, object value) =>
throw new NotSupportedException($"{nameof(BatchConsumeMessageContext)} does not allow change the message");

public IMessageContext TransformMessage(object message) => throw new NotImplementedException();
throw new NotSupportedException($"{nameof(BatchConsumeMessageContext)} does not allow to change the message");

public void Dispose() => this.batchDependencyScope.Dispose();
}
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)

try
{
context.ConsumerContext.AutoCompleteMessage = false;
context.ConsumerContext.AutoMessageCompletion = false;

this.batch.Add(context);

Expand Down
1 change: 0 additions & 1 deletion src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
/// <summary>
/// A GZIP message compressor
/// </summary>
[Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public class GzipMessageCompressor : IMessageCompressor
{
/// <inheritdoc />
Expand Down
4 changes: 0 additions & 4 deletions src/KafkaFlow.Compressor/ConfigurationBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ public static class ConfigurationBuilderExtensions
/// <param name="middlewares">The middleware configuration builder</param>
/// <typeparam name="T">The compressor type</typeparam>
/// <returns></returns>
[Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public static IConsumerMiddlewareConfigurationBuilder AddCompressor<T>(this IConsumerMiddlewareConfigurationBuilder middlewares)
where T : class, IMessageCompressor
{
Expand All @@ -29,7 +28,6 @@ public static IConsumerMiddlewareConfigurationBuilder AddCompressor<T>(this ICon
/// <typeparam name="T">The compressor type that implements <see cref="IMessageCompressor"/></typeparam>
/// <param name="factory">A factory to create the <see cref="IMessageCompressor"/> instance</param>
/// <returns></returns>
[Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public static IConsumerMiddlewareConfigurationBuilder AddCompressor<T>(
this IConsumerMiddlewareConfigurationBuilder middlewares,
Factory<T> factory)
Expand All @@ -45,7 +43,6 @@ public static IConsumerMiddlewareConfigurationBuilder AddCompressor<T>(
/// <param name="middlewares">The middleware configuration builder</param>
/// <typeparam name="T">The compressor type that implements <see cref="IMessageCompressor"/></typeparam>
/// <returns></returns>
[Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public static IProducerMiddlewareConfigurationBuilder AddCompressor<T>(this IProducerMiddlewareConfigurationBuilder middlewares)
where T : class, IMessageCompressor
{
Expand All @@ -61,7 +58,6 @@ public static IProducerMiddlewareConfigurationBuilder AddCompressor<T>(this IPro
/// <typeparam name="T">The compressor type that implements <see cref="IMessageCompressor"/></typeparam>
/// <param name="factory">A factory to create the <see cref="IMessageCompressor"/> instance</param>
/// <returns></returns>
[Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public static IProducerMiddlewareConfigurationBuilder AddCompressor<T>(
this IProducerMiddlewareConfigurationBuilder middlewares,
Factory<T> factory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ namespace KafkaFlow.UnitTests.BatchConsume
using System.Threading.Tasks;
using FluentAssertions;
using KafkaFlow.BatchConsume;
using KafkaFlow.Configuration;
using KafkaFlow.Consumers;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

Expand All @@ -30,7 +32,33 @@ public void Setup()

this.logHandlerMock = new Mock<ILogHandler>();

var middlewareContextMock = new Mock<IConsumerMiddlewareContext>();
var workerMock = new Mock<IWorker>();
var consumerMock = new Mock<IConsumer>();
var consumerConfigurationMock = new Mock<IConsumerConfiguration>();

middlewareContextMock
.SetupGet(x => x.Worker)
.Returns(workerMock.Object);

middlewareContextMock
.SetupGet(x => x.Consumer)
.Returns(consumerMock.Object);

consumerMock
.SetupGet(x => x.Configuration)
.Returns(consumerConfigurationMock.Object);

workerMock
.SetupGet(x => x.WorkerStopped)
.Returns(new WorkerStoppedSubject(this.logHandlerMock.Object));

consumerConfigurationMock
.SetupGet(x => x.AutoMessageCompletion)
.Returns(true);

this.target = new BatchConsumeMiddleware(
middlewareContextMock.Object,
BatchSize,
this.batchTimeout,
this.logHandlerMock.Object);
Expand All @@ -43,6 +71,10 @@ public async Task AddAsync_LessThanBatchSize_CallNextOnTimeout()
var consumerContext = new Mock<IConsumerContext>();
var context = new Mock<IMessageContext>();

consumerContext
.SetupGet(x => x.WorkerDependencyResolver)
.Returns(Mock.Of<IDependencyResolver>());

context
.Setup(x => x.ConsumerContext)
.Returns(consumerContext.Object);
Expand All @@ -64,6 +96,10 @@ public async Task AddAsync_ExactlyBatchSize_CallNextInstantly()
var consumerContext = new Mock<IConsumerContext>();
var contextMock = new Mock<IMessageContext>();

consumerContext
.SetupGet(x => x.WorkerDependencyResolver)
.Returns(Mock.Of<IDependencyResolver>());

contextMock
.Setup(x => x.ConsumerContext)
.Returns(consumerContext.Object);
Expand All @@ -89,6 +125,10 @@ public async Task AddAsync_MoreThanBatchSize_CallNextInstantlyThenCallWhenTimeou
var consumerContext = new Mock<IConsumerContext>();
var contextMock = new Mock<IMessageContext>();

consumerContext
.SetupGet(x => x.WorkerDependencyResolver)
.Returns(Mock.Of<IDependencyResolver>());

contextMock
.Setup(x => x.ConsumerContext)
.Returns(consumerContext.Object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ public void Build_RequiredCalls_ReturnDefaultValues()
// Assert
configuration.Topics.Should().BeEquivalentTo(topic1);
configuration.BufferSize.Should().Be(bufferSize);
configuration.WorkersCountCalculator(null).Result.Should().Be(workers);
configuration.WorkersCountCalculator(null, null).Result.Should().Be(workers);
configuration.GroupId.Should().Be(groupId);
configuration.GetKafkaConfig().AutoOffsetReset.Should().BeNull();
configuration.GetKafkaConfig().EnableAutoOffsetStore.Should().Be(false);
configuration.GetKafkaConfig().EnableAutoCommit.Should().Be(false);
configuration.AutoStoreOffsets.Should().Be(true);
configuration.AutoMessageCompletion.Should().Be(true);
configuration.AutoCommitInterval.Should().Be(TimeSpan.FromSeconds(5));
configuration.StatisticsHandlers.Should().BeEmpty();
configuration.PartitionsAssignedHandlers.Should().BeEmpty();
Expand Down Expand Up @@ -116,10 +116,10 @@ public void Build_AllCalls_ReturnPassedValues()
configuration.Topics.Should().BeEquivalentTo(topic1, topic2);
configuration.ConsumerName.Should().Be(name);
configuration.BufferSize.Should().Be(bufferSize);
configuration.WorkersCountCalculator(null).Result.Should().Be(workers);
configuration.WorkersCountCalculator(null, null).Result.Should().Be(workers);
configuration.GroupId.Should().Be(groupId);
configuration.GetKafkaConfig().AutoOffsetReset.Should().Be(offsetReset);
configuration.AutoStoreOffsets.Should().Be(false);
configuration.AutoMessageCompletion.Should().Be(false);
configuration.GetKafkaConfig().EnableAutoOffsetStore.Should().Be(false);
configuration.GetKafkaConfig().EnableAutoCommit.Should().Be(false);
configuration.AutoCommitInterval.Should().Be(TimeSpan.FromMilliseconds(autoCommitInterval));
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void Setup()

configurationMock
.SetupGet(x => x.WorkersCountCalculator)
.Returns(_ => Task.FromResult(10));
.Returns((_, _) => Task.FromResult(10));

configurationMock
.SetupGet(x => x.WorkersCountEvaluationInterval)
Expand Down
47 changes: 0 additions & 47 deletions src/KafkaFlow.UnitTests/ConsumerContextTests.cs

This file was deleted.

14 changes: 14 additions & 0 deletions src/KafkaFlow.UnitTests/ExtensionHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace KafkaFlow.UnitTests
{
using System;
using System.Threading.Tasks;

public static class ExtensionHelpers
{
public static TaskCompletionSource WithTimeout(this TaskCompletionSource taskCompletionSource, int milliseconds)
{
Task.Delay(milliseconds).ContinueWith(_ => taskCompletionSource.TrySetException(new TimeoutException()));
return taskCompletionSource;
}
}
}
6 changes: 3 additions & 3 deletions src/KafkaFlow.UnitTests/MemoryTelemetryStorageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class MemoryTelemetryStorageTests
[TestInitialize]
public void Setup()
{
this.dateTimeProviderMock = new(MockBehavior.Strict);
this.dateTimeProviderMock = new();

this.dateTimeProviderMock
.SetupGet(x => x.MinValue)
Expand Down Expand Up @@ -133,7 +133,7 @@ public void PutTwoItems_ExpiryOne_ReturnsOne()
var now = new DateTime(2000, 01, 01);

this.dateTimeProviderMock
.SetupGet(x => x.Now)
.SetupGet(x => x.UtcNow)
.Returns(now);

var metric1 = new ConsumerTelemetryMetric
Expand All @@ -155,7 +155,7 @@ public void PutTwoItems_ExpiryOne_ReturnsOne()
};

this.dateTimeProviderMock
.SetupGet(x => x.Now)
.SetupGet(x => x.UtcNow)
.Returns(now.AddSeconds(2));

// Act
Expand Down
Loading

0 comments on commit 057c6fd

Please sign in to comment.