diff --git a/examples/ConsumeAndPublishNATS/Model/IncomingMessage.cs b/examples/ConsumeAndPublishNATS/Model/IncomingMessage.cs index d46f6f6f..806b4e1b 100644 --- a/examples/ConsumeAndPublishNATS/Model/IncomingMessage.cs +++ b/examples/ConsumeAndPublishNATS/Model/IncomingMessage.cs @@ -1,5 +1,4 @@ using System; -using System.Text.Json.Serialization; namespace ConsumeAndPublishNATS.Model; diff --git a/examples/ConsumeAndPublishNATS/Model/OutgoingMessage.cs b/examples/ConsumeAndPublishNATS/Model/OutgoingMessage.cs index 95f09331..5c968f14 100644 --- a/examples/ConsumeAndPublishNATS/Model/OutgoingMessage.cs +++ b/examples/ConsumeAndPublishNATS/Model/OutgoingMessage.cs @@ -1,5 +1,4 @@ using System; -using System.Text.Json.Serialization; namespace ConsumeAndPublishNATS.Model; diff --git a/examples/OpenTelemetryExample/SingleOutputService.cs b/examples/OpenTelemetryExample/SingleOutputService.cs index 32964452..e976a3f1 100644 --- a/examples/OpenTelemetryExample/SingleOutputService.cs +++ b/examples/OpenTelemetryExample/SingleOutputService.cs @@ -3,9 +3,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using CloudNative.CloudEvents.Extensions; using Motor.Extensions.Diagnostics.Telemetry; -using Motor.Extensions.Diagnostics.Tracing; using OpenTelemetryExample.Model; using Motor.Extensions.Hosting.Abstractions; using Motor.Extensions.Hosting.CloudEvents; diff --git a/shared.csproj b/shared.csproj index 9215485f..57b36a6f 100644 --- a/shared.csproj +++ b/shared.csproj @@ -1,7 +1,7 @@ - 0.11.0 + 0.11.1 11 enable CS8600;CS8602;CS8625;CS8618;CS8604;CS8601 diff --git a/src/Motor.Extensions.Diagnostics.Metrics/MotorMetricsFactory.cs b/src/Motor.Extensions.Diagnostics.Metrics/MotorMetricsFactory.cs index 6c7a7a16..cc1f517f 100644 --- a/src/Motor.Extensions.Diagnostics.Metrics/MotorMetricsFactory.cs +++ b/src/Motor.Extensions.Diagnostics.Metrics/MotorMetricsFactory.cs @@ -1,7 +1,6 @@ using System; using System.Runtime.InteropServices; using Microsoft.Extensions.Options; -using Motor.Extensions.Hosting.Abstractions; using Motor.Extensions.Hosting.CloudEvents; using Prometheus.Client; using Prometheus.Client.AspNetCore; diff --git a/src/Motor.Extensions.Hosting.Abstractions/IMultiOutputService.cs b/src/Motor.Extensions.Hosting.Abstractions/IMultiOutputService.cs index f5c17148..d214a780 100644 --- a/src/Motor.Extensions.Hosting.Abstractions/IMultiOutputService.cs +++ b/src/Motor.Extensions.Hosting.Abstractions/IMultiOutputService.cs @@ -1,7 +1,5 @@ using System.Collections.Generic; -using System.Runtime.CompilerServices; using System.Threading; -using System.Threading.Tasks; using Motor.Extensions.Hosting.CloudEvents; namespace Motor.Extensions.Hosting.Abstractions; diff --git a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs index a1c02c16..a6c132e0 100644 --- a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs +++ b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs @@ -6,12 +6,14 @@ using System.Threading.Tasks; using CloudNative.CloudEvents; using Confluent.Kafka; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Motor.Extensions.Diagnostics.Metrics.Abstractions; using Motor.Extensions.Hosting.Abstractions; using Motor.Extensions.Hosting.CloudEvents; using Motor.Extensions.Hosting.Kafka.Options; +using Polly; using Prometheus.Client; namespace Motor.Extensions.Hosting.Kafka; @@ -24,16 +26,19 @@ public sealed class KafkaMessageConsumer : IMessageConsumer, IDisp private readonly IMetricFamily? _consumerLagGauge; private readonly IMetricFamily? _consumerLagSummary; private readonly ILogger> _logger; + private readonly IHostApplicationLifetime _applicationLifetime; private IConsumer? _consumer; private readonly SemaphoreSlim _messageSemaphore; public KafkaMessageConsumer(ILogger> logger, IOptions> config, + IHostApplicationLifetime applicationLifetime, IMetricsFactory>? metricsFactory, IApplicationNameService applicationNameService, CloudEventFormatter cloudEventFormatter) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _applicationLifetime = applicationLifetime; _applicationNameService = applicationNameService ?? throw new ArgumentNullException(nameof(config)); _cloudEventFormatter = cloudEventFormatter; _options = config.Value ?? throw new ArgumentNullException(nameof(config)); @@ -76,23 +81,23 @@ await Task.Run(async () => try { var msg = _consumer?.Consume(token); - if (msg != null && !msg.IsPartitionEOF) + if (msg is { IsPartitionEOF: false }) { - SingleMessageHandling(token, msg); + SingleMessageHandlingAsync(msg, token); } else { - _logger.LogDebug("No messages received"); + _logger.LogDebug(LogEvents.NoMessageReceived, "No messages received"); } } catch (OperationCanceledException) { - _logger.LogInformation("Terminating Kafka listener..."); + _logger.LogInformation(LogEvents.TerminatingKafkaListener, "Terminating Kafka listener..."); break; } catch (Exception e) { - _logger.LogError(e, "Failed to receive message.", e); + _logger.LogError(LogEvents.MessageReceivedFailure, e, "Failed to receive message."); } } }, token).ConfigureAwait(false); @@ -162,47 +167,63 @@ private void WriteStatistics(string json) } } - private void SingleMessageHandling(CancellationToken token, ConsumeResult msg) + private async Task SingleMessageHandlingAsync(ConsumeResult msg, CancellationToken token) { - _logger.LogDebug( - $"Received message from topic '{msg.Topic}:{msg.Partition}' with offset: '{msg.Offset}[{msg.TopicPartitionOffset}]'"); - var cloudEvent = KafkaMessageToCloudEvent(msg.Message); - - var taskAwaiter = ConsumeCallbackAsync?.Invoke(cloudEvent, token).GetAwaiter(); - taskAwaiter?.OnCompleted(() => + try { - var processedMessageStatus = taskAwaiter?.GetResult(); - _messageSemaphore.Release(); - switch (processedMessageStatus) - { - case ProcessedMessageStatus.Success: - break; - case ProcessedMessageStatus.TemporaryFailure: - break; - case ProcessedMessageStatus.InvalidInput: - break; - case ProcessedMessageStatus.CriticalFailure: - break; - case ProcessedMessageStatus.Failure: - break; - default: - throw new ArgumentOutOfRangeException(); - } - - if (msg.Offset % _options.CommitPeriod != 0) - { - return; - } + _logger.LogDebug(LogEvents.ReceivedMessage, + "Received message from topic '{Topic}:{Partition}' with offset: '{Offset}[{TopicPartitionOffset}]'", + msg.Topic, msg.Partition, msg.Offset, msg.TopicPartitionOffset); + var cloudEvent = KafkaMessageToCloudEvent(msg.Message); + + var retryPolicy = Policy + .HandleResult(status => status == ProcessedMessageStatus.TemporaryFailure) + .WaitAndRetryAsync(_options.RetriesOnTemporaryFailure, + retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))); + var status = await retryPolicy.ExecuteAsync(() => ConsumeCallbackAsync!.Invoke(cloudEvent, token)); + HandleMessageStatus(msg, status); + } + catch (Exception e) + { + _logger.LogCritical(LogEvents.MessageHandlingUnexpectedException, e, + "Unexpected exception in message handling"); + _applicationLifetime.StopApplication(); + } + } - try - { - _consumer?.Commit(msg); - } - catch (KafkaException e) - { - _logger.LogError($"Commit error: {e.Error.Reason}"); - } - }); + private void HandleMessageStatus(ConsumeResult msg, ProcessedMessageStatus? status) + { + switch (status) + { + case ProcessedMessageStatus.Success: + case ProcessedMessageStatus.InvalidInput: + case ProcessedMessageStatus.Failure: + if (msg.Offset.Value % _options.CommitPeriod == 0) + { + try + { + _consumer?.Commit(msg); + } + catch (KafkaException e) + { + _logger.LogError(LogEvents.CommitError, e, "Commit error: {Reason}", e.Error.Reason); + } + } + _messageSemaphore.Release(); + break; + case ProcessedMessageStatus.TemporaryFailure: + _logger.LogWarning(LogEvents.FailureDespiteRetrying, + "Message consume fails despite retrying"); + _applicationLifetime.StopApplication(); + break; + case ProcessedMessageStatus.CriticalFailure: + _logger.LogWarning(LogEvents.CriticalFailureOnConsume, + "Message consume fails with critical failure"); + _applicationLifetime.StopApplication(); + break; + default: + throw new ArgumentOutOfRangeException(nameof(status), status, "Unhandled ProcessedMessageStatus"); + } } public MotorCloudEvent KafkaMessageToCloudEvent(Message msg) diff --git a/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs b/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs new file mode 100644 index 00000000..55f3b37f --- /dev/null +++ b/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs @@ -0,0 +1,17 @@ +using Microsoft.Extensions.Logging; + +namespace Motor.Extensions.Hosting.Kafka; + +public static class LogEvents +{ + public static readonly EventId CriticalFailureOnConsume = new(0, nameof(CriticalFailureOnConsume)); + public static readonly EventId FailureDespiteRetrying = new(1, nameof(FailureDespiteRetrying)); + public static readonly EventId CommitError = new(2, nameof(CommitError)); + public static readonly EventId NoMessageReceived = new(3, nameof(NoMessageReceived)); + public static readonly EventId TerminatingKafkaListener = new(4, nameof(TerminatingKafkaListener)); + public static readonly EventId MessageReceivedFailure = new(5, nameof(MessageReceivedFailure)); + public static readonly EventId ReceivedMessage = new(6, nameof(ReceivedMessage)); + + public static readonly EventId MessageHandlingUnexpectedException = + new(7, nameof(MessageHandlingUnexpectedException)); +} diff --git a/src/Motor.Extensions.Hosting.Kafka/Motor.Extensions.Hosting.Kafka.csproj b/src/Motor.Extensions.Hosting.Kafka/Motor.Extensions.Hosting.Kafka.csproj index ce11e9c8..89ea510a 100644 --- a/src/Motor.Extensions.Hosting.Kafka/Motor.Extensions.Hosting.Kafka.csproj +++ b/src/Motor.Extensions.Hosting.Kafka/Motor.Extensions.Hosting.Kafka.csproj @@ -10,6 +10,7 @@ + diff --git a/src/Motor.Extensions.Hosting.Kafka/Options/KafkaConsumerOptions.cs b/src/Motor.Extensions.Hosting.Kafka/Options/KafkaConsumerOptions.cs index 2fb701bd..3a09a007 100644 --- a/src/Motor.Extensions.Hosting.Kafka/Options/KafkaConsumerOptions.cs +++ b/src/Motor.Extensions.Hosting.Kafka/Options/KafkaConsumerOptions.cs @@ -12,4 +12,5 @@ public KafkaConsumerOptions() public string? Topic { get; set; } public int CommitPeriod { get; set; } = 1000; public int MaxConcurrentMessages { get; set; } = 1000; + public int RetriesOnTemporaryFailure { get; set; } = 10; } diff --git a/src/Motor.Extensions.Hosting.RabbitMQ/BasicDeliverEventArgsExtensions.cs b/src/Motor.Extensions.Hosting.RabbitMQ/BasicDeliverEventArgsExtensions.cs index 0bd22f33..a274bb23 100644 --- a/src/Motor.Extensions.Hosting.RabbitMQ/BasicDeliverEventArgsExtensions.cs +++ b/src/Motor.Extensions.Hosting.RabbitMQ/BasicDeliverEventArgsExtensions.cs @@ -1,7 +1,5 @@ using System; -using Motor.Extensions.ContentEncoding.Abstractions; using Motor.Extensions.Hosting.CloudEvents; -using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace Motor.Extensions.Hosting.RabbitMQ; diff --git a/src/Motor.Extensions.Hosting.RabbitMQ/BasicPropertiesExtensions.cs b/src/Motor.Extensions.Hosting.RabbitMQ/BasicPropertiesExtensions.cs index 25f02438..22873844 100644 --- a/src/Motor.Extensions.Hosting.RabbitMQ/BasicPropertiesExtensions.cs +++ b/src/Motor.Extensions.Hosting.RabbitMQ/BasicPropertiesExtensions.cs @@ -4,7 +4,6 @@ using System.Text; using CloudNative.CloudEvents; using Motor.Extensions.ContentEncoding.Abstractions; -using Motor.Extensions.Hosting.Abstractions; using Motor.Extensions.Hosting.CloudEvents; using Motor.Extensions.Hosting.RabbitMQ.Options; using RabbitMQ.Client; diff --git a/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQConnectionFactory.cs b/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQConnectionFactory.cs index 7b8b31a9..e7e070a8 100644 --- a/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQConnectionFactory.cs +++ b/src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQConnectionFactory.cs @@ -1,6 +1,5 @@ using System; using System.ComponentModel.DataAnnotations; -using System.Net.Security; using System.Security.Authentication; using System.Threading; using Motor.Extensions.Hosting.RabbitMQ.Options; diff --git a/src/Motor.Extensions.TestUtilities/MotorCloudEvent.cs b/src/Motor.Extensions.TestUtilities/MotorCloudEvent.cs index 139f7b74..bef1a8b3 100644 --- a/src/Motor.Extensions.TestUtilities/MotorCloudEvent.cs +++ b/src/Motor.Extensions.TestUtilities/MotorCloudEvent.cs @@ -1,8 +1,4 @@ using System; -using System.Collections.Generic; -using System.Linq; -using CloudNative.CloudEvents; -using Motor.Extensions.Hosting.Abstractions; using Motor.Extensions.Hosting.CloudEvents; namespace Motor.Extensions.TestUtilities; diff --git a/src/Motor.Extensions.Utilities/MotorHost.cs b/src/Motor.Extensions.Utilities/MotorHost.cs index 73bd3a5f..58ac3331 100644 --- a/src/Motor.Extensions.Utilities/MotorHost.cs +++ b/src/Motor.Extensions.Utilities/MotorHost.cs @@ -7,7 +7,6 @@ using Motor.Extensions.Diagnostics.Metrics; using Motor.Extensions.Diagnostics.Telemetry; using Motor.Extensions.Hosting; -using Motor.Extensions.Hosting.Abstractions; using Motor.Extensions.Hosting.CloudEvents; using Motor.Extensions.Http; using Motor.Extensions.Utilities.Abstractions; diff --git a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs index 92e35182..a27bfe5b 100644 --- a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs +++ b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs @@ -1,9 +1,12 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using CloudNative.CloudEvents.SystemTextJson; using Confluent.Kafka; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Moq; @@ -57,7 +60,7 @@ public async Task Consume_RawPublishIntoKafkaAndConsumeCreateCloudEvent_Consumed public async Task Consume_PublishIntoKafkaAndConsumeWithCloudEvent_ConsumedEqualsPublished() { var topic = _randomizerString.Generate(); - var message = "testMessage"; + const string message = "testMessage"; var publisher = GetPublisher(topic); var motorCloudEvent = MotorCloudEvent.CreateTestCloudEvent(message).CreateNew(Encoding.UTF8.GetBytes(message)); @@ -83,7 +86,7 @@ public async Task Consume_PublishIntoKafkaAndConsumeWithCloudEvent_ConsumedEqual public async Task Consume_PublishIntoExtensionDefinedTopic_ConsumedEqualsPublished() { var topic = _randomizerString.Generate(); - var message = "testMessage"; + const string message = "testMessage"; var publisher = GetPublisher("wrong_topic"); var motorCloudEvent = MotorCloudEvent.CreateTestCloudEvent(message).CreateNew(Encoding.UTF8.GetBytes(message)); @@ -117,7 +120,8 @@ public async Task Consume_LimitMaxConcurrentMessages_StartProcessingLimitedNumbe { await PublishMessage(topic, "someKey", message); } - var consumer = GetConsumer(topic, maxConcurrentMessages); + var config = GetConsumerConfig(topic, maxConcurrentMessages); + var consumer = GetConsumer(topic, config); var numberOfStartedMessages = 0; consumer.ConsumeCallbackAsync = async (_, cancellationToken) => { @@ -137,6 +141,147 @@ public async Task Consume_LimitMaxConcurrentMessages_StartProcessingLimitedNumbe Assert.Equal(maxConcurrentMessages, numberOfStartedMessages); } + [Theory(Timeout = 50000)] + [InlineData(ProcessedMessageStatus.TemporaryFailure)] + [InlineData(ProcessedMessageStatus.CriticalFailure)] + public async Task Consume_SynchronousMessageHandlingWhereProcessingFailed_DoesNotProcessSecondMessage(ProcessedMessageStatus returnStatus) + { + var taskCompletionSource = new TaskCompletionSource(); + var topic = _randomizerString.Generate(); + await PublishMessage(topic, "someKey", "1"); + await PublishMessage(topic, "someKey", "2"); + var config = GetConsumerConfig(topic, maxConcurrentMessages: 1); + var consumer = GetConsumer(topic, config); + var distinctHandledMessages = new HashSet(); + consumer.ConsumeCallbackAsync = async (data, _) => + { + taskCompletionSource.TrySetResult(); + distinctHandledMessages.Add(Encoding.UTF8.GetString(data.TypedData)); + return await Task.FromResult(returnStatus); + }; + + await consumer.StartAsync(); + consumer.ExecuteAsync(); + + // Wait until processing begins + await taskCompletionSource.Task; + // Give consumer enough time to process further messages + await Task.Delay(TimeSpan.FromSeconds(1)); + Assert.Single(distinctHandledMessages); + } + + [Theory(Timeout = 50000)] + [InlineData(ProcessedMessageStatus.Success)] + [InlineData(ProcessedMessageStatus.Failure)] + [InlineData(ProcessedMessageStatus.InvalidInput)] + public async Task Consume_SynchronousMessageHandlingWithMultipleMessages_AllMessagesProcessed(ProcessedMessageStatus processedMessageStatus) + { + const int numMessages = 10; + var taskCompletionSource = new TaskCompletionSource(); + var topic = _randomizerString.Generate(); + var messages = Enumerable.Range(1, numMessages).Select(i => $"{i}").ToHashSet(); + foreach (var message in messages) + { + await PublishMessage(topic, "someKey", message); + } + var config = GetConsumerConfig(topic, maxConcurrentMessages: 1); + var consumer = GetConsumer(topic, config); + var distinctHandledMessages = new HashSet(); + consumer.ConsumeCallbackAsync = async (data, _) => + { + taskCompletionSource.TrySetResult(); + distinctHandledMessages.Add(Encoding.UTF8.GetString(data.TypedData)); + return await Task.FromResult(processedMessageStatus); + }; + + await consumer.StartAsync(); + consumer.ExecuteAsync(); + + // Wait until processing begins + await taskCompletionSource.Task; + // Give consumer enough time to process further messages + await Task.Delay(TimeSpan.FromSeconds(1)); + Assert.True(messages.SetEquals(distinctHandledMessages)); + } + + [Fact(Timeout = 50000)] + public async Task Consume_TemporaryFailure_ExecuteTheConfiguredNumberOfRetries() + { + const int expectedNumberOfRetries = 2; + var taskCompletionSource = new TaskCompletionSource(); + var topic = _randomizerString.Generate(); + await PublishMessage(topic, "someKey", "message"); + var config = GetConsumerConfig(topic, retriesOnTemporaryFailure: expectedNumberOfRetries); + var consumer = GetConsumer(topic, config); + var actualNumberOfTries = 0; + consumer.ConsumeCallbackAsync = async (data, _) => + { + taskCompletionSource.TrySetResult(); + actualNumberOfTries += 1; + return await Task.FromResult(ProcessedMessageStatus.TemporaryFailure); + }; + + await consumer.StartAsync(); + consumer.ExecuteAsync(); + + // Wait until processing begins + await taskCompletionSource.Task; + // Give consumer enough time to handle returned ProcessedMessageStatus + await Task.Delay(TimeSpan.FromSeconds(2 * Math.Pow(2, expectedNumberOfRetries))); + Assert.Equal(expectedNumberOfRetries + 1, actualNumberOfTries); + } + + [Fact(Timeout = 50000)] + public async Task Consume_TemporaryFailureEvenAfterRetries_ApplicationIsStopped() + { + const int numberOfRetries = 2; + var fakeLifetimeMock = new Mock(); + var taskCompletionSource = new TaskCompletionSource(); + var topic = _randomizerString.Generate(); + await PublishMessage(topic, "someKey", "message"); + var config = GetConsumerConfig(topic, retriesOnTemporaryFailure: numberOfRetries); + var consumer = GetConsumer(topic, config, fakeLifetimeMock.Object); + consumer.ConsumeCallbackAsync = async (data, _) => + { + taskCompletionSource.TrySetResult(); + return await Task.FromResult(ProcessedMessageStatus.TemporaryFailure); + }; + + await consumer.StartAsync(); + consumer.ExecuteAsync(); + + // Wait until processing begins + await taskCompletionSource.Task; + // Give consumer enough time to handle returned ProcessedMessageStatus + await Task.Delay(TimeSpan.FromSeconds(2 * Math.Pow(2, numberOfRetries))); + fakeLifetimeMock.Verify(mock => mock.StopApplication(), Times.Once); + } + + [Fact(Timeout = 50000)] + public async Task Consume_CriticalFailure_ApplicationIsStopped() + { + var fakeLifetimeMock = new Mock(); + var taskCompletionSource = new TaskCompletionSource(); + var topic = _randomizerString.Generate(); + await PublishMessage(topic, "someKey", "message"); + var config = GetConsumerConfig(topic); + var consumer = GetConsumer(topic, config, fakeLifetimeMock.Object); + consumer.ConsumeCallbackAsync = async (data, _) => + { + taskCompletionSource.TrySetResult(); + return await Task.FromResult(ProcessedMessageStatus.CriticalFailure); + }; + + await consumer.StartAsync(); + consumer.ExecuteAsync(); + + // Wait until processing begins + await taskCompletionSource.Task; + // Give consumer enough time to handle returned ProcessedMessageStatus + await Task.Delay(TimeSpan.FromSeconds(1)); + fakeLifetimeMock.Verify(mock => mock.StopApplication(), Times.Once); + } + private async Task PublishMessage(string topic, string key, string value) { using var producer = new ProducerBuilder(GetPublisherConfig(topic)).Build(); @@ -145,11 +290,12 @@ await producer.ProduceAsync(topic, producer.Flush(); } - private KafkaMessageConsumer GetConsumer(string topic, int maxConcurrentMessages = 1000) + private KafkaMessageConsumer GetConsumer(string topic, KafkaConsumerOptions config = null, IHostApplicationLifetime fakeLifetimeMock = null) { - var options = Options.Create(GetConsumerConfig(topic, maxConcurrentMessages)); + var options = Options.Create(config ?? GetConsumerConfig(topic)); var fakeLoggerMock = Mock.Of>>(); - return new KafkaMessageConsumer(fakeLoggerMock, options, null, GetApplicationNameService(), + fakeLifetimeMock ??= Mock.Of(); + return new KafkaMessageConsumer(fakeLoggerMock, options, fakeLifetimeMock, null, GetApplicationNameService(), new JsonEventFormatter()); } @@ -176,19 +322,21 @@ private IApplicationNameService GetApplicationNameService(string source = "test: return mock.Object; } - private KafkaConsumerOptions GetConsumerConfig(string topic, int maxConcurrentMessages, string groupId = "group_id") + private KafkaConsumerOptions GetConsumerConfig(string topic, int maxConcurrentMessages = 1000, + string groupId = "group_id", int retriesOnTemporaryFailure = 10) { return new() { Topic = topic, GroupId = groupId, - CommitPeriod = 1, + CommitPeriod = 1000, BootstrapServers = _fixture.BootstrapServers, EnableAutoCommit = false, StatisticsIntervalMs = 5000, SessionTimeoutMs = 6000, AutoOffsetReset = AutoOffsetReset.Earliest, - MaxConcurrentMessages = maxConcurrentMessages + MaxConcurrentMessages = maxConcurrentMessages, + RetriesOnTemporaryFailure = retriesOnTemporaryFailure }; } } diff --git a/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs b/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs index 65409c21..25a99662 100644 --- a/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs +++ b/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs @@ -2,6 +2,7 @@ using System.Linq; using CloudNative.CloudEvents.SystemTextJson; using Confluent.Kafka; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Moq; @@ -96,7 +97,8 @@ private KafkaMessageConsumer GetKafkaConsumer() { var options = Options.Create(GetConsumerConfig(KafkaTopic)); var fakeLoggerMock = Mock.Of>>(); - return new KafkaMessageConsumer(fakeLoggerMock, options, null, GetApplicationNameService(), + var fakeLifetimeMock = Mock.Of(); + return new KafkaMessageConsumer(fakeLoggerMock, options, fakeLifetimeMock, null, GetApplicationNameService(), new JsonEventFormatter()); } diff --git a/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/BasicPropertiesExtensionsTest.cs b/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/BasicPropertiesExtensionsTest.cs index 603fff07..ca84b27c 100644 --- a/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/BasicPropertiesExtensionsTest.cs +++ b/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/BasicPropertiesExtensionsTest.cs @@ -4,7 +4,6 @@ using CloudNative.CloudEvents; using Moq; using Motor.Extensions.ContentEncoding.Abstractions; -using Motor.Extensions.Hosting.Abstractions; using Motor.Extensions.Hosting.CloudEvents; using Motor.Extensions.Hosting.RabbitMQ; using Motor.Extensions.Hosting.RabbitMQ.Options; diff --git a/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/RabbitMQFixture.cs b/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/RabbitMQFixture.cs index b8754d5d..0ec3b1f1 100644 --- a/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/RabbitMQFixture.cs +++ b/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/RabbitMQFixture.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Generic; using System.Threading.Tasks; using DotNet.Testcontainers.Builders; using DotNet.Testcontainers.Configurations; diff --git a/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/RabbitMQTests.cs b/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/RabbitMQTests.cs index eb00e714..b289c07c 100644 --- a/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/RabbitMQTests.cs +++ b/test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/RabbitMQTests.cs @@ -203,6 +203,9 @@ public async Task ConsumerStartAsync_CheckParallelProcessing_EnsureAllMessagesAr { await Task.Delay(TimeSpan.FromMilliseconds(10)); } + // Give RabbitMQConsumer enough time to acknowledge message + await Task.Delay(TimeSpan.FromSeconds(1)); + Assert.Equal((uint)0, builder.MessagesInQueue(builder.QueueName)); } diff --git a/test/Motor.Extensions.Http_UnitTest/PrometheusDelegatingHandlerTests.cs b/test/Motor.Extensions.Http_UnitTest/PrometheusDelegatingHandlerTests.cs index 3dbc3d90..1d10e7cb 100644 --- a/test/Motor.Extensions.Http_UnitTest/PrometheusDelegatingHandlerTests.cs +++ b/test/Motor.Extensions.Http_UnitTest/PrometheusDelegatingHandlerTests.cs @@ -3,7 +3,6 @@ using Microsoft.Extensions.Hosting; using Moq; using Motor.Extensions.Diagnostics.Metrics; -using Motor.Extensions.Hosting.Abstractions; using Motor.Extensions.Hosting.CloudEvents; using Motor.Extensions.Http; using Motor.Extensions.Utilities;