Skip to content

Commit

Permalink
Merge pull request #361 from Particular/hotfix-8.1.4
Browse files Browse the repository at this point in the history
Hotfix 8.1.4
  • Loading branch information
SeanFeldman authored Apr 16, 2019
2 parents 4e08b1f + fa20c92 commit 7e51a50
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 13 deletions.
8 changes: 6 additions & 2 deletions src/Transport/AtLeastOnceReceiveStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ namespace NServiceBus.Transport.AzureStorageQueues

class AtLeastOnceReceiveStrategy : ReceiveStrategy
{
public AtLeastOnceReceiveStrategy(Func<MessageContext, Task> pipeline, Func<ErrorContext, Task<ErrorHandleResult>> errorPipe)
public AtLeastOnceReceiveStrategy(Func<MessageContext, Task> pipeline, Func<ErrorContext, Task<ErrorHandleResult>> errorPipe, CriticalError criticalError)
{
this.pipeline = pipeline;
this.errorPipe = errorPipe;
this.criticalError = criticalError;
}

public override async Task Receive(MessageRetrieved retrieved, MessageWrapper message)
Expand Down Expand Up @@ -56,8 +57,10 @@ public override async Task Receive(MessageRetrieved retrieved, MessageWrapper me
}
catch (Exception e)
{
Logger.Warn("The error pipeline wasn't able to handle the exception.", e);
criticalError.Raise($"Failed to execute recoverability policy for message with native ID: `{message.Id}`", e);

await retrieved.Nack().ConfigureAwait(false);

return;
}

Expand All @@ -78,6 +81,7 @@ public override async Task Receive(MessageRetrieved retrieved, MessageWrapper me

readonly Func<MessageContext, Task> pipeline;
readonly Func<ErrorContext, Task<ErrorHandleResult>> errorPipe;
readonly CriticalError criticalError;

static readonly ILog Logger = LogManager.GetLogger<ReceiveStrategy>();
}
Expand Down
2 changes: 1 addition & 1 deletion src/Transport/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<E
circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker("AzureStorageQueue-MessagePump", TimeToWaitBeforeTriggering, ex => criticalError.Raise("Failed to receive message from Azure Storage Queue.", ex));
messageReceiver.PurgeOnStartup = settings.PurgeOnStartup;

receiveStrategy = ReceiveStrategy.BuildReceiveStrategy(onMessage, onError, settings.RequiredTransactionMode);
receiveStrategy = ReceiveStrategy.BuildReceiveStrategy(onMessage, onError, settings.RequiredTransactionMode, criticalError);

return messageReceiver.Init(settings.InputQueue, settings.ErrorQueue);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Transport/ReceiveStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ abstract class ReceiveStrategy
{
public abstract Task Receive(MessageRetrieved retrieved, MessageWrapper message);

public static ReceiveStrategy BuildReceiveStrategy(Func<MessageContext, Task> pipe, Func<ErrorContext, Task<ErrorHandleResult>> errorPipe, TransportTransactionMode transactionMode)
public static ReceiveStrategy BuildReceiveStrategy(Func<MessageContext, Task> pipe, Func<ErrorContext, Task<ErrorHandleResult>> errorPipe, TransportTransactionMode transactionMode, CriticalError criticalError)
{
// ReSharper disable once SwitchStatementMissingSomeCases
switch (transactionMode)
{
case TransportTransactionMode.None:
return new AtMostOnceReceiveStrategy(pipe, errorPipe);
case TransportTransactionMode.ReceiveOnly:
return new AtLeastOnceReceiveStrategy(pipe, errorPipe);
return new AtLeastOnceReceiveStrategy(pipe, errorPipe, criticalError);
default:
throw new NotSupportedException($"The TransportTransactionMode {transactionMode} is not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using NServiceBus.Settings;
using NServiceBus.TransportTests;
using NServiceBus.Unicast.Messages;
using NUnit.Framework;

public class ConfigureAzureStorageQueueTransportInfrastructure : IConfigureTransportInfrastructure
{
Expand All @@ -27,12 +26,6 @@ public TransportConfigurationResult Configure(SettingsHolder settings, Transport
settings.Set(registry);
}

var methodName = TestContext.CurrentContext.Test.MethodName;
if (methodName == nameof(When_on_error_throws.Should_reinvoke_on_error_with_original_exception))
{
throw new IgnoreException("ASQ uses a circuit breaker that is triggered after specific period of time. Critical errors are not reported immediately");
}

settings.Set(AzureStorageQueueTransport.SerializerSettingsKey, Tuple.Create<SerializationDefinition, SettingsHolder>(new XmlSerializer(), settings));

var transportExtension = new TransportExtensions<AzureStorageQueueTransport>(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<PackageReference Include="NUnit3TestAdapter" Version="3.12.0" />
<PackageReference Include="NUnit" Version="3.11.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="NServiceBus.TransportTests.Sources" Version="7.1.6" />
<PackageReference Include="NServiceBus.TransportTests.Sources" Version="7.2.0-beta0079" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit 7e51a50

Please sign in to comment.