diff --git a/src/Transport/AtLeastOnceReceiveStrategy.cs b/src/Transport/AtLeastOnceReceiveStrategy.cs index 304fbb3d..dafd87d4 100644 --- a/src/Transport/AtLeastOnceReceiveStrategy.cs +++ b/src/Transport/AtLeastOnceReceiveStrategy.cs @@ -11,10 +11,11 @@ namespace NServiceBus.Transport.AzureStorageQueues class AtLeastOnceReceiveStrategy : ReceiveStrategy { - public AtLeastOnceReceiveStrategy(Func pipeline, Func> errorPipe) + public AtLeastOnceReceiveStrategy(Func pipeline, Func> errorPipe, CriticalError criticalError) { this.pipeline = pipeline; this.errorPipe = errorPipe; + this.criticalError = criticalError; } public override async Task Receive(MessageRetrieved retrieved, MessageWrapper message) @@ -56,8 +57,10 @@ public override async Task Receive(MessageRetrieved retrieved, MessageWrapper me } catch (Exception e) { - Logger.Warn("The error pipeline wasn't able to handle the exception.", e); + criticalError.Raise($"Failed to execute recoverability policy for message with native ID: `{message.Id}`", e); + await retrieved.Nack().ConfigureAwait(false); + return; } @@ -78,6 +81,7 @@ public override async Task Receive(MessageRetrieved retrieved, MessageWrapper me readonly Func pipeline; readonly Func> errorPipe; + readonly CriticalError criticalError; static readonly ILog Logger = LogManager.GetLogger(); } diff --git a/src/Transport/MessagePump.cs b/src/Transport/MessagePump.cs index 774fbdcb..b916581d 100644 --- a/src/Transport/MessagePump.cs +++ b/src/Transport/MessagePump.cs @@ -30,7 +30,7 @@ public Task Init(Func onMessage, Func criticalError.Raise("Failed to receive message from Azure Storage Queue.", ex)); messageReceiver.PurgeOnStartup = settings.PurgeOnStartup; - receiveStrategy = ReceiveStrategy.BuildReceiveStrategy(onMessage, onError, settings.RequiredTransactionMode); + receiveStrategy = ReceiveStrategy.BuildReceiveStrategy(onMessage, onError, settings.RequiredTransactionMode, criticalError); return messageReceiver.Init(settings.InputQueue, settings.ErrorQueue); } diff --git a/src/Transport/ReceiveStrategy.cs b/src/Transport/ReceiveStrategy.cs index d7f7d121..b31cbdb7 100644 --- a/src/Transport/ReceiveStrategy.cs +++ b/src/Transport/ReceiveStrategy.cs @@ -9,7 +9,7 @@ abstract class ReceiveStrategy { public abstract Task Receive(MessageRetrieved retrieved, MessageWrapper message); - public static ReceiveStrategy BuildReceiveStrategy(Func pipe, Func> errorPipe, TransportTransactionMode transactionMode) + public static ReceiveStrategy BuildReceiveStrategy(Func pipe, Func> errorPipe, TransportTransactionMode transactionMode, CriticalError criticalError) { // ReSharper disable once SwitchStatementMissingSomeCases switch (transactionMode) @@ -17,7 +17,7 @@ public static ReceiveStrategy BuildReceiveStrategy(Func pi case TransportTransactionMode.None: return new AtMostOnceReceiveStrategy(pipe, errorPipe); case TransportTransactionMode.ReceiveOnly: - return new AtLeastOnceReceiveStrategy(pipe, errorPipe); + return new AtLeastOnceReceiveStrategy(pipe, errorPipe, criticalError); default: throw new NotSupportedException($"The TransportTransactionMode {transactionMode} is not supported"); } diff --git a/src/TransportTests/ConfigureAzureStorageQueueTransportInfrastructure.cs b/src/TransportTests/ConfigureAzureStorageQueueTransportInfrastructure.cs index 0b09bce8..2b3ebebd 100644 --- a/src/TransportTests/ConfigureAzureStorageQueueTransportInfrastructure.cs +++ b/src/TransportTests/ConfigureAzureStorageQueueTransportInfrastructure.cs @@ -9,7 +9,6 @@ using NServiceBus.Settings; using NServiceBus.TransportTests; using NServiceBus.Unicast.Messages; -using NUnit.Framework; public class ConfigureAzureStorageQueueTransportInfrastructure : IConfigureTransportInfrastructure { @@ -27,12 +26,6 @@ public TransportConfigurationResult Configure(SettingsHolder settings, Transport settings.Set(registry); } - var methodName = TestContext.CurrentContext.Test.MethodName; - if (methodName == nameof(When_on_error_throws.Should_reinvoke_on_error_with_original_exception)) - { - throw new IgnoreException("ASQ uses a circuit breaker that is triggered after specific period of time. Critical errors are not reported immediately"); - } - settings.Set(AzureStorageQueueTransport.SerializerSettingsKey, Tuple.Create(new XmlSerializer(), settings)); var transportExtension = new TransportExtensions(settings); diff --git a/src/TransportTests/NServiceBus.AzureStorageQueues.TransportTests.csproj b/src/TransportTests/NServiceBus.AzureStorageQueues.TransportTests.csproj index 75280f4d..13f928cf 100644 --- a/src/TransportTests/NServiceBus.AzureStorageQueues.TransportTests.csproj +++ b/src/TransportTests/NServiceBus.AzureStorageQueues.TransportTests.csproj @@ -17,7 +17,7 @@ - +