diff --git a/samples/KafkaFlow.Sample.PauseConsumerOnError/PauseConsumerOnExceptionMiddleware.cs b/samples/KafkaFlow.Sample.PauseConsumerOnError/PauseConsumerOnExceptionMiddleware.cs
index 3daaa5a54..b45b0ec4b 100644
--- a/samples/KafkaFlow.Sample.PauseConsumerOnError/PauseConsumerOnExceptionMiddleware.cs
+++ b/samples/KafkaFlow.Sample.PauseConsumerOnError/PauseConsumerOnExceptionMiddleware.cs
@@ -21,7 +21,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
catch (Exception exception)
{
- context.ConsumerContext.AutoCompleteMessage = false;
+ context.ConsumerContext.AutoMessageCompletion = false;
this.logHandler.Error("Error handling message", exception,
new
{
diff --git a/src/KafkaFlow.Abstractions/IConsumerContext.cs b/src/KafkaFlow.Abstractions/IConsumerContext.cs
index 5e5d2c1e9..624934f2a 100644
--- a/src/KafkaFlow.Abstractions/IConsumerContext.cs
+++ b/src/KafkaFlow.Abstractions/IConsumerContext.cs
@@ -57,7 +57,7 @@ public interface IConsumerContext
///
/// Gets or sets a value indicating whether if the framework should auto complete the message in the end
///
- bool AutoCompleteMessage { get; set; }
+ bool AutoMessageCompletion { get; set; }
///
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
@@ -73,11 +73,18 @@ public interface IConsumerContext
///
IDependencyResolver WorkerDependencyResolver { get; }
+ ///
+ /// Gets a Task that completes when the method is invoked,
+ /// indicating the end of message processing. This allows async operations
+ /// to wait for the message to be fully processed and its offset stored.
+ ///
+ Task Completion { get; }
+
///
/// Signals the completion of message processing and stores the message offset to eventually be committed.
/// After this call, the framework marks the message processing as finished and releases resources associated with the message.
/// By default, this method is automatically invoked when message processing concludes, unless
- /// the consumer is configured for manual message completion or the flag is set to false.
+ /// the consumer is configured for manual message completion or the flag is set to false.
///
void Complete();
@@ -96,12 +103,5 @@ public interface IConsumerContext
/// Resume Kafka's message fetch
///
void Resume();
-
- ///
- /// Gets a Task that completes when the method is invoked,
- /// indicating the end of message processing. This allows async operations
- /// to wait for the message to be fully processed and its offset stored.
- ///
- Task Completion { get; }
}
}
diff --git a/src/KafkaFlow.Abstractions/IMessageContext.cs b/src/KafkaFlow.Abstractions/IMessageContext.cs
index 0380a6db0..639b47dbf 100644
--- a/src/KafkaFlow.Abstractions/IMessageContext.cs
+++ b/src/KafkaFlow.Abstractions/IMessageContext.cs
@@ -1,7 +1,5 @@
namespace KafkaFlow
{
- using System;
-
///
/// A context that contains the message and metadata
///
@@ -34,7 +32,6 @@ public interface IMessageContext
///
IDependencyResolver DependencyResolver { get; }
-
///
/// Creates a new with the new message
///
@@ -42,13 +39,5 @@ public interface IMessageContext
/// The new message value
/// A new message context containing the new values
IMessageContext SetMessage(object key, object value);
-
- ///
- /// Deprecated
- ///
- /// key
- ///
- [Obsolete("This method should no longer be used, use the " + nameof(SetMessage) + "() instead.", true)]
- IMessageContext TransformMessage(object message);
}
}
diff --git a/src/KafkaFlow.Abstractions/MiddlewareLifetime.cs b/src/KafkaFlow.Abstractions/MiddlewareLifetime.cs
index e17fa8de3..4263ac7c5 100644
--- a/src/KafkaFlow.Abstractions/MiddlewareLifetime.cs
+++ b/src/KafkaFlow.Abstractions/MiddlewareLifetime.cs
@@ -13,13 +13,6 @@ public enum MiddlewareLifetime
///
Singleton,
- ///
- /// Obsolete. Please use Message instead. Indicates a new middleware instance is instantiated for each message scope.
- /// This instance will be disposed when the message scope ends.
- ///
- [Obsolete("Use Message lifetime instead")]
- Scoped,
-
///
/// Indicates a new middleware instance is instantiated for each individual message scope, ensuring isolated processing.
/// This instance will be disposed when the message scope ends.
diff --git a/src/KafkaFlow.Admin/MemoryTelemetryStorage.cs b/src/KafkaFlow.Admin/MemoryTelemetryStorage.cs
index 5d7a06257..081856304 100644
--- a/src/KafkaFlow.Admin/MemoryTelemetryStorage.cs
+++ b/src/KafkaFlow.Admin/MemoryTelemetryStorage.cs
@@ -52,7 +52,7 @@ private void TryCleanItems()
return;
}
- this.lastCleanDate = this.dateTimeProvider.Now;
+ this.lastCleanDate = this.dateTimeProvider.UtcNow;
this.CleanExpiredItems();
}
@@ -70,6 +70,6 @@ private void CleanExpiredItems()
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private bool NeedsCleaning() => this.dateTimeProvider.Now - this.lastCleanDate > this.cleanRunInterval;
+ private bool NeedsCleaning() => this.dateTimeProvider.UtcNow - this.lastCleanDate > this.cleanRunInterval;
}
}
diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs b/src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs
index 566a3de0e..5db162b63 100644
--- a/src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs
+++ b/src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs
@@ -27,9 +27,7 @@ public BatchConsumeMessageContext(
public IDependencyResolver DependencyResolver => this.batchDependencyScope.Resolver;
public IMessageContext SetMessage(object key, object value) =>
- throw new NotSupportedException($"{nameof(BatchConsumeMessageContext)} does not allow change the message");
-
- public IMessageContext TransformMessage(object message) => throw new NotImplementedException();
+ throw new NotSupportedException($"{nameof(BatchConsumeMessageContext)} does not allow to change the message");
public void Dispose() => this.batchDependencyScope.Dispose();
}
diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs b/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
index 24117ba8d..e7cde97fc 100644
--- a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
+++ b/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
@@ -46,7 +46,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
try
{
- context.ConsumerContext.AutoCompleteMessage = false;
+ context.ConsumerContext.AutoMessageCompletion = false;
this.batch.Add(context);
diff --git a/src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs b/src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs
index f49237c59..f1fef5ebd 100644
--- a/src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs
+++ b/src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs
@@ -7,7 +7,6 @@
///
/// A GZIP message compressor
///
- [Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public class GzipMessageCompressor : IMessageCompressor
{
///
diff --git a/src/KafkaFlow.Compressor/ConfigurationBuilderExtensions.cs b/src/KafkaFlow.Compressor/ConfigurationBuilderExtensions.cs
index eb5e83f5b..204f5311e 100644
--- a/src/KafkaFlow.Compressor/ConfigurationBuilderExtensions.cs
+++ b/src/KafkaFlow.Compressor/ConfigurationBuilderExtensions.cs
@@ -14,7 +14,6 @@ public static class ConfigurationBuilderExtensions
/// The middleware configuration builder
/// The compressor type
///
- [Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public static IConsumerMiddlewareConfigurationBuilder AddCompressor(this IConsumerMiddlewareConfigurationBuilder middlewares)
where T : class, IMessageCompressor
{
@@ -29,7 +28,6 @@ public static IConsumerMiddlewareConfigurationBuilder AddCompressor(this ICon
/// The compressor type that implements
/// A factory to create the instance
///
- [Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public static IConsumerMiddlewareConfigurationBuilder AddCompressor(
this IConsumerMiddlewareConfigurationBuilder middlewares,
Factory factory)
@@ -45,7 +43,6 @@ public static IConsumerMiddlewareConfigurationBuilder AddCompressor(
/// The middleware configuration builder
/// The compressor type that implements
///
- [Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public static IProducerMiddlewareConfigurationBuilder AddCompressor(this IProducerMiddlewareConfigurationBuilder middlewares)
where T : class, IMessageCompressor
{
@@ -61,7 +58,6 @@ public static IProducerMiddlewareConfigurationBuilder AddCompressor(this IPro
/// The compressor type that implements
/// A factory to create the instance
///
- [Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")]
public static IProducerMiddlewareConfigurationBuilder AddCompressor(
this IProducerMiddlewareConfigurationBuilder middlewares,
Factory factory)
diff --git a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs
index 0a16eeffb..e55e85ae8 100644
--- a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs
+++ b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs
@@ -4,6 +4,8 @@ namespace KafkaFlow.UnitTests.BatchConsume
using System.Threading.Tasks;
using FluentAssertions;
using KafkaFlow.BatchConsume;
+ using KafkaFlow.Configuration;
+ using KafkaFlow.Consumers;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
@@ -30,7 +32,33 @@ public void Setup()
this.logHandlerMock = new Mock();
+ var middlewareContextMock = new Mock();
+ var workerMock = new Mock();
+ var consumerMock = new Mock();
+ var consumerConfigurationMock = new Mock();
+
+ middlewareContextMock
+ .SetupGet(x => x.Worker)
+ .Returns(workerMock.Object);
+
+ middlewareContextMock
+ .SetupGet(x => x.Consumer)
+ .Returns(consumerMock.Object);
+
+ consumerMock
+ .SetupGet(x => x.Configuration)
+ .Returns(consumerConfigurationMock.Object);
+
+ workerMock
+ .SetupGet(x => x.WorkerStopped)
+ .Returns(new WorkerStoppedSubject(this.logHandlerMock.Object));
+
+ consumerConfigurationMock
+ .SetupGet(x => x.AutoMessageCompletion)
+ .Returns(true);
+
this.target = new BatchConsumeMiddleware(
+ middlewareContextMock.Object,
BatchSize,
this.batchTimeout,
this.logHandlerMock.Object);
@@ -43,6 +71,10 @@ public async Task AddAsync_LessThanBatchSize_CallNextOnTimeout()
var consumerContext = new Mock();
var context = new Mock();
+ consumerContext
+ .SetupGet(x => x.WorkerDependencyResolver)
+ .Returns(Mock.Of());
+
context
.Setup(x => x.ConsumerContext)
.Returns(consumerContext.Object);
@@ -64,6 +96,10 @@ public async Task AddAsync_ExactlyBatchSize_CallNextInstantly()
var consumerContext = new Mock();
var contextMock = new Mock();
+ consumerContext
+ .SetupGet(x => x.WorkerDependencyResolver)
+ .Returns(Mock.Of());
+
contextMock
.Setup(x => x.ConsumerContext)
.Returns(consumerContext.Object);
@@ -89,6 +125,10 @@ public async Task AddAsync_MoreThanBatchSize_CallNextInstantlyThenCallWhenTimeou
var consumerContext = new Mock();
var contextMock = new Mock();
+ consumerContext
+ .SetupGet(x => x.WorkerDependencyResolver)
+ .Returns(Mock.Of());
+
contextMock
.Setup(x => x.ConsumerContext)
.Returns(consumerContext.Object);
diff --git a/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs b/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs
index 51e387ffb..b632bbbe7 100644
--- a/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs
+++ b/src/KafkaFlow.UnitTests/ConfigurationBuilders/ConsumerConfigurationBuilderTests.cs
@@ -56,12 +56,12 @@ public void Build_RequiredCalls_ReturnDefaultValues()
// Assert
configuration.Topics.Should().BeEquivalentTo(topic1);
configuration.BufferSize.Should().Be(bufferSize);
- configuration.WorkersCountCalculator(null).Result.Should().Be(workers);
+ configuration.WorkersCountCalculator(null, null).Result.Should().Be(workers);
configuration.GroupId.Should().Be(groupId);
configuration.GetKafkaConfig().AutoOffsetReset.Should().BeNull();
configuration.GetKafkaConfig().EnableAutoOffsetStore.Should().Be(false);
configuration.GetKafkaConfig().EnableAutoCommit.Should().Be(false);
- configuration.AutoStoreOffsets.Should().Be(true);
+ configuration.AutoMessageCompletion.Should().Be(true);
configuration.AutoCommitInterval.Should().Be(TimeSpan.FromSeconds(5));
configuration.StatisticsHandlers.Should().BeEmpty();
configuration.PartitionsAssignedHandlers.Should().BeEmpty();
@@ -116,10 +116,10 @@ public void Build_AllCalls_ReturnPassedValues()
configuration.Topics.Should().BeEquivalentTo(topic1, topic2);
configuration.ConsumerName.Should().Be(name);
configuration.BufferSize.Should().Be(bufferSize);
- configuration.WorkersCountCalculator(null).Result.Should().Be(workers);
+ configuration.WorkersCountCalculator(null, null).Result.Should().Be(workers);
configuration.GroupId.Should().Be(groupId);
configuration.GetKafkaConfig().AutoOffsetReset.Should().Be(offsetReset);
- configuration.AutoStoreOffsets.Should().Be(false);
+ configuration.AutoMessageCompletion.Should().Be(false);
configuration.GetKafkaConfig().EnableAutoOffsetStore.Should().Be(false);
configuration.GetKafkaConfig().EnableAutoCommit.Should().Be(false);
configuration.AutoCommitInterval.Should().Be(TimeSpan.FromMilliseconds(autoCommitInterval));
diff --git a/src/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs b/src/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs
index 5fafd4bbf..d8492f775 100644
--- a/src/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs
+++ b/src/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs
@@ -55,7 +55,7 @@ public void Setup()
configurationMock
.SetupGet(x => x.WorkersCountCalculator)
- .Returns(_ => Task.FromResult(10));
+ .Returns((_, _) => Task.FromResult(10));
configurationMock
.SetupGet(x => x.WorkersCountEvaluationInterval)
diff --git a/src/KafkaFlow.UnitTests/ConsumerContextTests.cs b/src/KafkaFlow.UnitTests/ConsumerContextTests.cs
deleted file mode 100644
index 259dea856..000000000
--- a/src/KafkaFlow.UnitTests/ConsumerContextTests.cs
+++ /dev/null
@@ -1,47 +0,0 @@
-namespace KafkaFlow.UnitTests
-{
- using System;
- using System.Threading;
- using Confluent.Kafka;
- using FluentAssertions;
- using KafkaFlow.Consumers;
- using Microsoft.VisualStudio.TestTools.UnitTesting;
-
- [TestClass]
- public class ConsumerContextTests
- {
- [TestMethod]
- public void MessageTimestamp_ConsumeResultHasMessageTimestamp_ReturnsMessageTimestampFromResult()
- {
- // Arrange
- var expectedMessageTimestamp = new DateTime(
- 2020,
- 1,
- 1,
- 0,
- 0,
- 0);
-
- var consumerResult = new ConsumeResult
- {
- Message = new Message
- {
- Timestamp = new Timestamp(expectedMessageTimestamp),
- },
- };
-
- var target = new ConsumerContext(
- null,
- null,
- consumerResult,
- CancellationToken.None,
- 0);
-
- // Act
- var messageTimestamp = target.MessageTimestamp;
-
- // Assert
- messageTimestamp.Should().Be(expectedMessageTimestamp.ToUniversalTime());
- }
- }
-}
diff --git a/src/KafkaFlow.UnitTests/ExtensionHelpers.cs b/src/KafkaFlow.UnitTests/ExtensionHelpers.cs
new file mode 100644
index 000000000..50c81f1e2
--- /dev/null
+++ b/src/KafkaFlow.UnitTests/ExtensionHelpers.cs
@@ -0,0 +1,14 @@
+namespace KafkaFlow.UnitTests
+{
+ using System;
+ using System.Threading.Tasks;
+
+ public static class ExtensionHelpers
+ {
+ public static TaskCompletionSource WithTimeout(this TaskCompletionSource taskCompletionSource, int milliseconds)
+ {
+ Task.Delay(milliseconds).ContinueWith(_ => taskCompletionSource.TrySetException(new TimeoutException()));
+ return taskCompletionSource;
+ }
+ }
+}
diff --git a/src/KafkaFlow.UnitTests/MemoryTelemetryStorageTests.cs b/src/KafkaFlow.UnitTests/MemoryTelemetryStorageTests.cs
index e13828790..350d80d3f 100644
--- a/src/KafkaFlow.UnitTests/MemoryTelemetryStorageTests.cs
+++ b/src/KafkaFlow.UnitTests/MemoryTelemetryStorageTests.cs
@@ -18,7 +18,7 @@ public class MemoryTelemetryStorageTests
[TestInitialize]
public void Setup()
{
- this.dateTimeProviderMock = new(MockBehavior.Strict);
+ this.dateTimeProviderMock = new();
this.dateTimeProviderMock
.SetupGet(x => x.MinValue)
@@ -133,7 +133,7 @@ public void PutTwoItems_ExpiryOne_ReturnsOne()
var now = new DateTime(2000, 01, 01);
this.dateTimeProviderMock
- .SetupGet(x => x.Now)
+ .SetupGet(x => x.UtcNow)
.Returns(now);
var metric1 = new ConsumerTelemetryMetric
@@ -155,7 +155,7 @@ public void PutTwoItems_ExpiryOne_ReturnsOne()
};
this.dateTimeProviderMock
- .SetupGet(x => x.Now)
+ .SetupGet(x => x.UtcNow)
.Returns(now.AddSeconds(2));
// Act
diff --git a/src/KafkaFlow.UnitTests/OffsetCommitterTests.cs b/src/KafkaFlow.UnitTests/OffsetCommitterTests.cs
index 4f2d71db2..71b8c44c8 100644
--- a/src/KafkaFlow.UnitTests/OffsetCommitterTests.cs
+++ b/src/KafkaFlow.UnitTests/OffsetCommitterTests.cs
@@ -4,6 +4,7 @@ namespace KafkaFlow.UnitTests
using System.Collections.Generic;
using System.Linq;
using System.Threading;
+ using System.Threading.Tasks;
using Confluent.Kafka;
using FluentAssertions;
using KafkaFlow.Consumers;
@@ -16,20 +17,15 @@ public class OffsetCommitterTests
private const int TestTimeout = 5000;
private Mock consumerMock;
- private Mock dependencyResolverMock;
-
- private Mock logHandlerMock;
private TopicPartition topicPartition;
private OffsetCommitter offsetCommitter;
[TestInitialize]
- public void Setup()
+ public async Task Setup()
{
this.consumerMock = new Mock();
- this.logHandlerMock = new Mock();
- this.dependencyResolverMock = new Mock();
this.topicPartition = new TopicPartition("topic-A", new Partition(1));
this.consumerMock
@@ -38,74 +34,83 @@ public void Setup()
this.offsetCommitter = new OffsetCommitter(
this.consumerMock.Object,
- this.dependencyResolverMock.Object,
- new List<(Action>, TimeSpan)>(),
- this.logHandlerMock.Object);
+ Mock.Of(),
+ Mock.Of());
+
+ await this.offsetCommitter.StartAsync();
}
[TestCleanup]
- public void Cleanup()
+ public async Task Cleanup()
{
- this.offsetCommitter.Dispose();
+ await this.offsetCommitter.StopAsync();
}
[TestMethod]
- public void MarkAsProcessed_ShouldCommit()
+ public async Task MarkAsProcessed_ShouldCommit()
{
// Arrange
- var offset = new TopicPartitionOffset(this.topicPartition, new Offset(1));
- var expectedOffsets = new[] { offset };
+ var expectedOffsets = new[] { new TopicPartitionOffset(this.topicPartition, new Offset(2)) };
- var ready = new ManualResetEvent(false);
+ var ready = new TaskCompletionSource().WithTimeout(TestTimeout);
this.consumerMock
- .Setup(c => c.Commit(It.Is>(l => l.SequenceEqual(expectedOffsets))))
- .Callback((IEnumerable _) => { ready.Set(); });
+ .Setup(c => c.Commit(It.Is>(l => l.SequenceEqual(expectedOffsets))))
+ .Callback((IEnumerable _) => ready.SetResult());
// Act
- this.offsetCommitter.MarkAsProcessed(offset);
- ready.WaitOne(TestTimeout);
+ this.offsetCommitter.MarkAsProcessed(
+ new KafkaFlow.TopicPartitionOffset(
+ this.topicPartition.Topic,
+ this.topicPartition.Partition,
+ 1));
+
+ await ready.Task;
// Assert
- this.consumerMock.Verify(
- c => c.Commit(It.Is>(l => l.SequenceEqual(expectedOffsets))),
- Times.Once);
+ this.consumerMock.VerifyAll();
}
[TestMethod]
- public void PendingOffsetsState_ShouldExecuteHandlers()
+ public async Task PendingOffsetsState_ShouldExecuteHandlers()
{
// Arrange
- var ready = new ManualResetEvent(false);
+ var ready = new TaskCompletionSource().WithTimeout(TestTimeout);
- using var committer = new OffsetCommitter(
+ var committer = new OffsetCommitter(
this.consumerMock.Object,
- this.dependencyResolverMock.Object,
- new List<(Action> handler, TimeSpan interval)>
- {
- ((_, _) => ready.Set(), TimeSpan.FromMilliseconds(100)),
- },
- this.logHandlerMock.Object);
+ Mock.Of(),
+ Mock.Of());
+
+ committer.PendingOffsetsStatisticsHandlers.Add(new((_, _) => ready.TrySetResult(), TimeSpan.FromMilliseconds(100)));
+
+ await committer.StartAsync();
// Act
- committer.MarkAsProcessed(new TopicPartitionOffset(this.topicPartition, new Offset(1)));
+ committer.MarkAsProcessed(
+ new KafkaFlow.TopicPartitionOffset(
+ this.topicPartition.Topic,
+ this.topicPartition.Partition,
+ 1));
// Assert
- ready.WaitOne(TestTimeout).Should().BeTrue();
+ await ready.Task;
+
+ // Cleanup
+ await committer.StopAsync();
}
[TestMethod]
- public void MarkAsProcessed_WithFailure_ShouldRequeueFailedOffsetAndCommit()
+ public async Task MarkAsProcessed_WithFailure_ShouldRequeueFailedOffsetAndCommit()
{
// Arrange
- var offset = new TopicPartitionOffset(this.topicPartition, new Offset(2));
- var expectedOffsets = new[] { offset };
+ var expectedOffsets = new[] { new TopicPartitionOffset(this.topicPartition, new Offset(2)) };
- var ready = new ManualResetEvent(false);
+ var ready = new TaskCompletionSource().WithTimeout(TestTimeout);
var hasThrown = false;
this.consumerMock
- .Setup(c => c.Commit(It.Is>(l => l.SequenceEqual(expectedOffsets))))
+ .Setup(c => c.Commit(It.Is>(l => l.SequenceEqual(expectedOffsets))))
.Callback(
(IEnumerable _) =>
{
@@ -115,16 +120,21 @@ public void MarkAsProcessed_WithFailure_ShouldRequeueFailedOffsetAndCommit()
throw new InvalidOperationException();
}
- ready.Set();
+ ready.TrySetResult();
});
// Act
- this.offsetCommitter.MarkAsProcessed(offset);
- ready.WaitOne(TestTimeout);
+ this.offsetCommitter.MarkAsProcessed(
+ new KafkaFlow.TopicPartitionOffset(
+ this.topicPartition.Topic,
+ this.topicPartition.Partition,
+ 1));
+
+ await ready.Task;
// Assert
this.consumerMock.Verify(
- c => c.Commit(It.Is>(l => l.SequenceEqual(expectedOffsets))),
+ c => c.Commit(It.Is>(l => l.SequenceEqual(expectedOffsets))),
Times.Exactly(2));
}
}
diff --git a/src/KafkaFlow.UnitTests/OffsetManagerTests.cs b/src/KafkaFlow.UnitTests/OffsetManagerTests.cs
index b557ff483..631de1336 100644
--- a/src/KafkaFlow.UnitTests/OffsetManagerTests.cs
+++ b/src/KafkaFlow.UnitTests/OffsetManagerTests.cs
@@ -1,17 +1,20 @@
namespace KafkaFlow.UnitTests
{
+ using System;
using System.Collections.Generic;
using Confluent.Kafka;
+ using FluentAssertions;
using KafkaFlow.Consumers;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
+ using TopicPartitionOffset = KafkaFlow.TopicPartitionOffset;
[TestClass]
public class OffsetManagerTests
{
private Mock committerMock;
- private TopicPartition topicPartition;
private OffsetManager target;
+ private TopicPartition topicPartition;
[TestInitialize]
public void Setup()
@@ -25,30 +28,27 @@ public void Setup()
}
[TestMethod]
- public void MarkAsProcessed_WithInvalidTopicPartition_ShouldDoNothing()
+ public void MarkAsProcessed_WithNotQueuedContext_ShouldThrowInvalidOperation()
{
- // Arrange
- this.target.Enqueue(new TopicPartitionOffset(this.topicPartition, new Offset(1)));
-
// Act
- this.target.MarkAsProcessed(new TopicPartitionOffset(new TopicPartition("topic-B", new Partition(1)), new Offset(1)));
+ Action act = () => this.target.MarkAsProcessed(this.MockConsumerContext(1));
// Assert
- this.committerMock.Verify(c => c.MarkAsProcessed(It.IsAny()), Times.Never());
+ act.Should().Throw();
}
[TestMethod]
public void MarkAsProcessed_WithGaps_ShouldStoreOffsetJustOnce()
{
// Arrange
- this.target.Enqueue(new TopicPartitionOffset(this.topicPartition, new Offset(1)));
- this.target.Enqueue(new TopicPartitionOffset(this.topicPartition, new Offset(2)));
- this.target.Enqueue(new TopicPartitionOffset(this.topicPartition, new Offset(3)));
+ this.target.Enqueue(this.MockConsumerContext(1));
+ this.target.Enqueue(this.MockConsumerContext(2));
+ this.target.Enqueue(this.MockConsumerContext(3));
// Act
- this.target.MarkAsProcessed(new TopicPartitionOffset(this.topicPartition, new Offset(3)));
- this.target.MarkAsProcessed(new TopicPartitionOffset(this.topicPartition, new Offset(2)));
- this.target.MarkAsProcessed(new TopicPartitionOffset(this.topicPartition, new Offset(1)));
+ this.target.MarkAsProcessed(this.MockConsumerContext(3));
+ this.target.MarkAsProcessed(this.MockConsumerContext(2));
+ this.target.MarkAsProcessed(this.MockConsumerContext(1));
// Assert
this.committerMock.Verify(
@@ -56,9 +56,33 @@ public void MarkAsProcessed_WithGaps_ShouldStoreOffsetJustOnce()
c.MarkAsProcessed(
It.Is(
p =>
- p.Partition.Equals(this.topicPartition.Partition) &&
- p.Offset.Value.Equals(4))),
+ p.Partition == this.topicPartition.Partition &&
+ p.Offset == 3)),
Times.Once);
}
+
+ private IConsumerContext MockConsumerContext(int offset)
+ {
+ var mock = new Mock();
+ var tpo = new TopicPartitionOffset(this.topicPartition.Topic, this.topicPartition.Partition, offset);
+
+ mock
+ .SetupGet(x => x.Offset)
+ .Returns(tpo.Offset);
+
+ mock
+ .SetupGet(x => x.Partition)
+ .Returns(tpo.Partition);
+
+ mock
+ .SetupGet(x => x.Topic)
+ .Returns(tpo.Topic);
+
+ mock
+ .SetupGet(x => x.TopicPartitionOffset)
+ .Returns(tpo);
+
+ return mock.Object;
+ }
}
}
diff --git a/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs b/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs
index 323f0088c..e3a4b168c 100644
--- a/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs
+++ b/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs
@@ -1,192 +1,169 @@
namespace KafkaFlow.UnitTests
{
using System;
- using System.Collections.Concurrent;
using System.Collections.Generic;
- using System.Linq;
- using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using KafkaFlow.Consumers;
using Microsoft.VisualStudio.TestTools.UnitTesting;
+ using Moq;
[TestClass]
public class PartitionOffsetsTests
{
[TestMethod]
- public void ShouldUpdate_WithoutAddingOffsets_ThrowsException()
+ public void TryDequeue_WithoutEnqueuing_ThrowsException()
{
// Arrange
var offsets = new PartitionOffsets();
// Act
- Func act = () => offsets.IsCommitAllowed(1, out _);
+ Func act = () => offsets.TryDequeue(MockConsumerContext(1));
// Assert
act.Should().Throw();
}
[TestMethod]
- public void ShouldUpdateOffset_NextOffset_ShouldUpdate()
+ public void TryDequeue_WithSequencedContext_SetDequeuedContext()
{
// Arrange
var offsets = new PartitionOffsets();
- offsets.Enqueue(1);
+
+ var context = MockConsumerContext(1);
+
+ offsets.Enqueue(context);
// Act
- var shouldUpdate = offsets.IsCommitAllowed(1, out var offset);
+ var isCommitAllowed = offsets.TryDequeue(context);
// Assert
- Assert.IsTrue(shouldUpdate);
- Assert.AreEqual(1, offset);
+ Assert.IsTrue(isCommitAllowed);
+ Assert.AreEqual(1, offsets.DequeuedContext.Offset);
}
[TestMethod]
- public void ShouldUpdateOffset_WithOneGap_ShouldNotUpdate()
+ public void TryDequeue_WithNonSequencedContexts_UnsetDequeuedContext()
{
// Arrange
var offsets = new PartitionOffsets();
- offsets.Enqueue(1);
+ offsets.Enqueue(MockConsumerContext(1));
// Act
- var shouldUpdate = offsets.IsCommitAllowed(2, out var offset);
+ var isCommitAllowed = offsets.TryDequeue(MockConsumerContext(2));
// Assert
- Assert.IsFalse(shouldUpdate);
- Assert.AreEqual(-1, offset);
+ Assert.IsFalse(isCommitAllowed);
+ Assert.IsNull(offsets.DequeuedContext);
}
[TestMethod]
- public void ShouldUpdateOffset_WithManyGaps_ShouldUpdate()
+ public void TryDequeue_WithUnorderedSequence_SetDequeuedContextWhenAllowed()
{
// Arrange
var offsets = new PartitionOffsets();
- offsets.Enqueue(1);
- offsets.Enqueue(2);
- offsets.Enqueue(4);
- offsets.Enqueue(5);
- offsets.Enqueue(7);
- offsets.Enqueue(8);
- offsets.Enqueue(15);
- offsets.Enqueue(20);
- offsets.Enqueue(50);
+
+ var context1 = MockConsumerContext(1);
+ var context2 = MockConsumerContext(2);
+ var context4 = MockConsumerContext(4);
+ var context5 = MockConsumerContext(5);
+
+ offsets.Enqueue(context1);
+ offsets.Enqueue(context2);
+ offsets.Enqueue(context4);
+ offsets.Enqueue(context5);
// Act
var results = new[]
{
new
{
- ShouldUpdateResult = offsets.IsCommitAllowed(7, out long offset),
- ShouldUpdateExpected = false,
- LastOffsetResult = offset,
- LastOffsetExpected = -1,
- },
- new
- {
- ShouldUpdateResult = offsets.IsCommitAllowed(1, out offset),
- ShouldUpdateExpected = true,
- LastOffsetResult = offset,
- LastOffsetExpected = 1,
- },
- new
- {
- ShouldUpdateResult = offsets.IsCommitAllowed(2, out offset),
- ShouldUpdateExpected = true,
- LastOffsetResult = offset,
- LastOffsetExpected = 2,
- },
- new
- {
- ShouldUpdateResult = offsets.IsCommitAllowed(20, out offset),
- ShouldUpdateExpected = false,
- LastOffsetResult = offset,
- LastOffsetExpected = -1,
- },
- new
- {
- ShouldUpdateResult = offsets.IsCommitAllowed(5, out offset),
- ShouldUpdateExpected = false,
- LastOffsetResult = offset,
- LastOffsetExpected = -1,
- },
- new
- {
- ShouldUpdateResult = offsets.IsCommitAllowed(8, out offset),
- ShouldUpdateExpected = false,
- LastOffsetResult = offset,
- LastOffsetExpected = -1,
+ Dequeued = offsets.TryDequeue(context5),
+ ExcpectedDequeuedResult = false,
+ DequeuedContextOffset = offsets.DequeuedContext?.Offset,
+ ExpectedDequeuedContextOffset = -1,
},
new
{
- ShouldUpdateResult = offsets.IsCommitAllowed(4, out offset),
- ShouldUpdateExpected = true,
- LastOffsetResult = offset,
- LastOffsetExpected = 8,
+ Dequeued = offsets.TryDequeue(context1),
+ ExcpectedDequeuedResult = true,
+ DequeuedContextOffset = offsets.DequeuedContext?.Offset,
+ ExpectedDequeuedContextOffset = 1,
},
new
{
- ShouldUpdateResult = offsets.IsCommitAllowed(15, out offset),
- ShouldUpdateExpected = true,
- LastOffsetResult = offset,
- LastOffsetExpected = 20,
+ Dequeued = offsets.TryDequeue(context2),
+ ExcpectedDequeuedResult = true,
+ DequeuedContextOffset = offsets.DequeuedContext?.Offset,
+ ExpectedDequeuedContextOffset = 2,
},
new
{
- ShouldUpdateResult = offsets.IsCommitAllowed(50, out offset),
- ShouldUpdateExpected = true,
- LastOffsetResult = offset,
- LastOffsetExpected = 50,
+ Dequeued = offsets.TryDequeue(context4),
+ ExcpectedDequeuedResult = true,
+ DequeuedContextOffset = offsets.DequeuedContext?.Offset,
+ ExpectedDequeuedContextOffset = 5,
},
};
// Assert
foreach (var result in results)
{
- Assert.AreEqual(result.ShouldUpdateExpected, result.ShouldUpdateResult);
- Assert.AreEqual(result.LastOffsetExpected, result.LastOffsetResult);
+ Assert.AreEqual(result.ExcpectedDequeuedResult, result.Dequeued);
+ Assert.AreEqual(result.ExpectedDequeuedContextOffset, result.DequeuedContextOffset ?? -1);
}
}
[TestMethod]
- public void ShouldUpdateOffset_WithManyConcurrentOffsets_ShouldUpdate()
+ public void TryDequeue_WithConcurrentCalls_RemainTheLastDequeuedContext()
{
// Arrange
- const int count = 1_000;
+ const int count = 100;
+ const int lastOffset = count - 1;
var target = new PartitionOffsets();
- var offsetsCommitted = new ConcurrentBag();
+ var contexts = new IConsumerContext[count];
- var waitHandle = new ManualResetEvent(false);
+ var taskCompletionSource = new TaskCompletionSource();
- for (var i = 0; i < count; i += 2)
+ for (var i = 0; i < count; i++)
{
- target.Enqueue(i);
+ contexts[i] = MockConsumerContext(i);
+ target.Enqueue(contexts[i]);
}
// Act
var tasks = new List();
- for (var i = 0; i < count; i += 2)
+ for (var i = 0; i < count; i++)
{
- var offset = i;
tasks.Add(
- Task.Run(
- () =>
+ Task.Factory.StartNew(
+ async index =>
{
- waitHandle.WaitOne();
+ await taskCompletionSource.Task;
- if (target.IsCommitAllowed(offset, out var lastProcessedOffset))
- {
- offsetsCommitted.Add(lastProcessedOffset);
- }
- }));
+ target.TryDequeue(contexts[(int)index]);
+ },
+ i));
}
- waitHandle.Set();
+ taskCompletionSource.SetResult();
Task.WaitAll(tasks.ToArray());
// Assert
- Assert.AreEqual(count - 2, offsetsCommitted.Max());
+ Assert.AreEqual(lastOffset, target.DequeuedContext.Offset);
+ }
+
+ private static IConsumerContext MockConsumerContext(int offset)
+ {
+ var mock = new Mock();
+
+ mock
+ .SetupGet(x => x.Offset)
+ .Returns(offset);
+
+ return mock.Object;
}
}
}
diff --git a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs
index f3ae30c46..a74de9d43 100644
--- a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs
+++ b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs
@@ -29,8 +29,7 @@ public ConsumerConfiguration(
IReadOnlyList> statisticsHandlers,
IReadOnlyList>> partitionsAssignedHandlers,
IReadOnlyList>> partitionsRevokedHandlers,
- IReadOnlyList<(Action> handler, TimeSpan interval)>
- pendingOffsetsHandlers,
+ IReadOnlyList pendingOffsetsStatisticsHandlers,
ConsumerCustomFactory customFactory)
{
this.consumerConfig = consumerConfig ?? throw new ArgumentNullException(nameof(consumerConfig));
@@ -59,7 +58,7 @@ public ConsumerConfiguration(
this.StatisticsHandlers = statisticsHandlers;
this.PartitionsAssignedHandlers = partitionsAssignedHandlers;
this.PartitionsRevokedHandlers = partitionsRevokedHandlers;
- this.PendingOffsetsHandlers = pendingOffsetsHandlers;
+ this.PendingOffsetsStatisticsHandlers = pendingOffsetsStatisticsHandlers;
this.CustomFactory = customFactory;
this.BufferSize = bufferSize > 0 ?
@@ -108,8 +107,7 @@ public ConsumerConfiguration(
public IReadOnlyList>> PartitionsRevokedHandlers { get; }
- public IReadOnlyList<(Action> handler, TimeSpan interval)>
- PendingOffsetsHandlers { get; }
+ public IReadOnlyList PendingOffsetsStatisticsHandlers { get; }
public ConsumerCustomFactory CustomFactory { get; }
diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
index 784a02200..246cf30e2 100644
--- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
+++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
@@ -14,8 +14,7 @@ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuild
private readonly List topicsPartitions = new();
private readonly List> statisticsHandlers = new();
- private readonly List<(Action>, TimeSpan interval)>
- pendingOffsetsStatisticsHandlers = new();
+ private readonly List pendingOffsetsStatisticsHandlers = new();
private readonly List>> partitionAssignedHandlers = new();
private readonly List>> partitionRevokedHandlers = new();
@@ -228,7 +227,7 @@ public IConsumerConfigurationBuilder WithPendingOffsetsStatisticsHandler(
Action> pendingOffsetsHandler,
TimeSpan interval)
{
- this.pendingOffsetsStatisticsHandlers.Add((pendingOffsetsHandler, interval));
+ this.pendingOffsetsStatisticsHandlers.Add(new(pendingOffsetsHandler, interval));
return this;
}
diff --git a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs
index a2b2acb32..4099867b9 100644
--- a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs
+++ b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs
@@ -104,10 +104,7 @@ public interface IConsumerConfiguration
///
/// Gets the handlers that will be called when there are pending offsets
///
- IReadOnlyList<(Action> handler, TimeSpan interval)> PendingOffsetsHandlers
- {
- get;
- }
+ IReadOnlyList PendingOffsetsStatisticsHandlers { get; }
///
/// Gets the custom factory used to create a new
diff --git a/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs b/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs
index 02e900c82..95cf0a893 100644
--- a/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs
+++ b/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs
@@ -50,7 +50,7 @@ private static InstanceLifetime ParseLifetime(MiddlewareLifetime lifetime)
{
return lifetime switch
{
- MiddlewareLifetime.Scoped or MiddlewareLifetime.Message => InstanceLifetime.Scoped,
+ MiddlewareLifetime.Message => InstanceLifetime.Scoped,
MiddlewareLifetime.Singleton => InstanceLifetime.Singleton,
MiddlewareLifetime.Transient or
MiddlewareLifetime.Worker or
diff --git a/src/KafkaFlow/Configuration/PendingOffsetsStatisticsHandler.cs b/src/KafkaFlow/Configuration/PendingOffsetsStatisticsHandler.cs
new file mode 100644
index 000000000..6df2d96dc
--- /dev/null
+++ b/src/KafkaFlow/Configuration/PendingOffsetsStatisticsHandler.cs
@@ -0,0 +1,33 @@
+namespace KafkaFlow.Configuration
+{
+ using System;
+ using System.Collections.Generic;
+ using Confluent.Kafka;
+
+ ///
+ /// Represents a handler for pending offsets statistics.
+ ///
+ public class PendingOffsetsStatisticsHandler
+ {
+ ///
+ /// Initializes a new instance of the class with the specified handler and interval.
+ ///
+ /// The action to handle pending offsets statistics.
+ /// The interval at which the handler should be executed.
+ public PendingOffsetsStatisticsHandler(Action> handler, TimeSpan interval)
+ {
+ this.Handler = handler;
+ this.Interval = interval;
+ }
+
+ ///
+ /// Gets the action that handles pending offsets statistics.
+ ///
+ public Action> Handler { get; }
+
+ ///
+ /// Gets the interval at which the handler should be executed.
+ ///
+ public TimeSpan Interval { get; }
+ }
+}
diff --git a/src/KafkaFlow/Consumers/ConsumerContext.cs b/src/KafkaFlow/Consumers/ConsumerContext.cs
index 0e6a7e397..06cac4c38 100644
--- a/src/KafkaFlow/Consumers/ConsumerContext.cs
+++ b/src/KafkaFlow/Consumers/ConsumerContext.cs
@@ -27,7 +27,7 @@ public ConsumerContext(
this.offsetManager = offsetManager;
this.worker = worker;
this.messageDependencyScope = messageDependencyScope;
- this.AutoCompleteMessage = this.consumer.Configuration.AutoMessageCompletion;
+ this.AutoMessageCompletion = this.consumer.Configuration.AutoMessageCompletion;
this.TopicPartitionOffset = new TopicPartitionOffset(
kafkaResult.Topic,
kafkaResult.Partition.Value,
@@ -55,7 +55,7 @@ public ConsumerContext(
public string GroupId => this.consumer.Configuration.GroupId;
- public bool AutoCompleteMessage { get; set; }
+ public bool AutoMessageCompletion { get; set; }
public DateTime MessageTimestamp { get; }
diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs
index 94e73fb47..d3af0701a 100644
--- a/src/KafkaFlow/Consumers/ConsumerWorker.cs
+++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs
@@ -152,7 +152,7 @@ await this.middlewareExecutor
});
}
- if (context.ConsumerContext.AutoCompleteMessage)
+ if (context.ConsumerContext.AutoMessageCompletion)
{
context.ConsumerContext.Complete();
}
diff --git a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
index 942974db9..779992b1e 100644
--- a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
+++ b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
@@ -45,8 +45,9 @@ public ConsumerWorkerPool(
new OffsetCommitter(
consumer,
consumerDependencyResolver,
- consumer.Configuration.PendingOffsetsHandlers,
logHandler);
+
+ this.offsetCommitter.PendingOffsetsStatisticsHandlers.AddRange(consumer.Configuration.PendingOffsetsStatisticsHandlers);
}
public int CurrentWorkersCount { get; private set; }
diff --git a/src/KafkaFlow/Consumers/IWorkerLifetimeContext.cs b/src/KafkaFlow/Consumers/IConsumerMiddlewareContext.cs
similarity index 100%
rename from src/KafkaFlow/Consumers/IWorkerLifetimeContext.cs
rename to src/KafkaFlow/Consumers/IConsumerMiddlewareContext.cs
diff --git a/src/KafkaFlow/Consumers/IOffsetCommitter.cs b/src/KafkaFlow/Consumers/IOffsetCommitter.cs
index 1cdbd06e5..2986b610e 100644
--- a/src/KafkaFlow/Consumers/IOffsetCommitter.cs
+++ b/src/KafkaFlow/Consumers/IOffsetCommitter.cs
@@ -1,9 +1,13 @@
namespace KafkaFlow.Consumers
{
+ using System.Collections.Generic;
using System.Threading.Tasks;
+ using KafkaFlow.Configuration;
internal interface IOffsetCommitter
{
+ List PendingOffsetsStatisticsHandlers { get; }
+
void MarkAsProcessed(TopicPartitionOffset tpo);
Task StartAsync();
diff --git a/src/KafkaFlow/Consumers/NullOffsetCommitter.cs b/src/KafkaFlow/Consumers/NullOffsetCommitter.cs
index 9d000c44b..27228989c 100644
--- a/src/KafkaFlow/Consumers/NullOffsetCommitter.cs
+++ b/src/KafkaFlow/Consumers/NullOffsetCommitter.cs
@@ -1,10 +1,14 @@
namespace KafkaFlow.Consumers
{
+ using System.Collections.Generic;
using System.Threading.Tasks;
using KafkaFlow;
+ using KafkaFlow.Configuration;
internal class NullOffsetCommitter : IOffsetCommitter
{
+ public List PendingOffsetsStatisticsHandlers { get; } = new();
+
public void Dispose()
{
// Do nothing
diff --git a/src/KafkaFlow/Consumers/OffsetCommitter.cs b/src/KafkaFlow/Consumers/OffsetCommitter.cs
index 8f58d08f4..f93e6ac99 100644
--- a/src/KafkaFlow/Consumers/OffsetCommitter.cs
+++ b/src/KafkaFlow/Consumers/OffsetCommitter.cs
@@ -6,16 +6,13 @@ namespace KafkaFlow.Consumers
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+ using KafkaFlow.Configuration;
internal class OffsetCommitter : IOffsetCommitter
{
private readonly IConsumer consumer;
private readonly IDependencyResolver resolver;
- private readonly
- IReadOnlyList<(Action> handler, TimeSpan interval)>
- pendingOffsetsHandlers;
-
private readonly ILogHandler logHandler;
private readonly object commitSyncRoot = new();
@@ -28,16 +25,15 @@ private readonly
public OffsetCommitter(
IConsumer consumer,
IDependencyResolver resolver,
- IReadOnlyList<(Action> handler, TimeSpan interval)>
- pendingOffsetsHandlers,
ILogHandler logHandler)
{
this.consumer = consumer;
this.resolver = resolver;
- this.pendingOffsetsHandlers = pendingOffsetsHandlers;
this.logHandler = logHandler;
}
+ public List PendingOffsetsStatisticsHandlers { get; } = new();
+
public void MarkAsProcessed(TopicPartitionOffset tpo)
{
this.offsetsToCommit.AddOrUpdate(
@@ -54,13 +50,13 @@ public Task StartAsync()
this.consumer.Configuration.AutoCommitInterval,
this.consumer.Configuration.AutoCommitInterval);
- this.statisticsTimers = this.pendingOffsetsHandlers
+ this.statisticsTimers = this.PendingOffsetsStatisticsHandlers
.Select(
- s => new Timer(
- _ => this.PendingOffsetsHandler(this.resolver, s.handler),
+ handler => new Timer(
+ _ => this.PendingOffsetsHandler(handler),
null,
- TimeSpan.Zero,
- s.interval))
+ handler.Interval,
+ handler.Interval))
.ToList();
return Task.CompletedTask;
@@ -79,14 +75,12 @@ public Task StopAsync()
return Task.CompletedTask;
}
- private void PendingOffsetsHandler(
- IDependencyResolver resolver,
- Action> handler)
+ private void PendingOffsetsHandler(PendingOffsetsStatisticsHandler handler)
{
if (!this.offsetsToCommit.IsEmpty)
{
- handler(
- resolver,
+ handler.Handler(
+ this.resolver,
this.offsetsToCommit.Values.Select(
x =>
new Confluent.Kafka.TopicPartitionOffset(x.Topic, x.Partition, x.Offset)));
diff --git a/src/KafkaFlow/Consumers/OffsetManager.cs b/src/KafkaFlow/Consumers/OffsetManager.cs
index 42d4c8288..657091486 100644
--- a/src/KafkaFlow/Consumers/OffsetManager.cs
+++ b/src/KafkaFlow/Consumers/OffsetManager.cs
@@ -29,9 +29,9 @@ public void MarkAsProcessed(IConsumerContext context)
lock (offsets)
{
- if (offsets.IsCommitAllowed(context))
+ if (offsets.TryDequeue(context))
{
- this.committer.MarkAsProcessed(offsets.ReadyToCommitContext.TopicPartitionOffset);
+ this.committer.MarkAsProcessed(offsets.DequeuedContext.TopicPartitionOffset);
}
}
}
diff --git a/src/KafkaFlow/Consumers/PartitionOffsets.cs b/src/KafkaFlow/Consumers/PartitionOffsets.cs
index b39b60245..36fa60314 100644
--- a/src/KafkaFlow/Consumers/PartitionOffsets.cs
+++ b/src/KafkaFlow/Consumers/PartitionOffsets.cs
@@ -10,7 +10,7 @@ internal class PartitionOffsets
private readonly SortedDictionary processedContexts = new();
private readonly LinkedList receivedContexts = new();
- public IConsumerContext ReadyToCommitContext { get; private set; }
+ public IConsumerContext DequeuedContext { get; private set; }
public void Enqueue(IConsumerContext context)
{
@@ -20,9 +20,9 @@ public void Enqueue(IConsumerContext context)
}
}
- public bool IsCommitAllowed(IConsumerContext context)
+ public bool TryDequeue(IConsumerContext context)
{
- this.ReadyToCommitContext = null;
+ this.DequeuedContext = null;
lock (this.receivedContexts)
{
@@ -40,7 +40,7 @@ public bool IsCommitAllowed(IConsumerContext context)
do
{
- this.ReadyToCommitContext = this.receivedContexts.First.Value;
+ this.DequeuedContext = this.receivedContexts.First.Value;
this.receivedContexts.RemoveFirst();
} while (this.receivedContexts.Count > 0 && this.processedContexts.Remove(this.receivedContexts.First.Value.Offset));
}
diff --git a/src/KafkaFlow/MessageContext.cs b/src/KafkaFlow/MessageContext.cs
index 0b9dadec3..55ea1de16 100644
--- a/src/KafkaFlow/MessageContext.cs
+++ b/src/KafkaFlow/MessageContext.cs
@@ -32,7 +32,5 @@ public MessageContext(
this.DependencyResolver,
this.ConsumerContext,
this.ProducerContext);
-
- public IMessageContext TransformMessage(object message) => throw new System.NotImplementedException();
}
}