Skip to content

Commit

Permalink
Merge pull request #410 from Particular/error-queue-8-1
Browse files Browse the repository at this point in the history
Move poisonous messages to the error queue
  • Loading branch information
danielmarbach authored Apr 29, 2020
2 parents 06c6029 + 235b3a5 commit cc757e1
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 6 deletions.
12 changes: 12 additions & 0 deletions src/AcceptanceTests/CustomEndpointConfigurationExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace NServiceBus.AcceptanceTests
{
using Configuration.AdvancedExtensibility;

public static class CustomEndpointConfigurationExtensions
{
public static TransportExtensions<AzureStorageQueueTransport> ConfigureAsqTransport(this EndpointConfiguration configuration)
{
return new TransportExtensions<AzureStorageQueueTransport>(configuration.GetSettings());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
namespace NServiceBus.Azure.Transports.WindowsAzureStorageQueues.AcceptanceTests.Receiving
{
using System;
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;

public class When_MessageReceived_fails_to_unwrap_a_message : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_move_message_to_the_error_queue()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<Receiver>(b =>
{
b.When(async (e, c) =>
{
var message = new MyMessage
{
Id = c.TestRunId
};

await e.SendLocal(message);
});
})
.WithEndpoint<ErrorSpy>()
.Done(c => c.MessageMovedToErrorQueue)
.Run();

var exceptionThrownByUnwrapper = context.Logs.Any(x => x.Message.StartsWith("Failed to deserialize message envelope for message with id"));
Assert.That(exceptionThrownByUnwrapper, "Exception thrown by MessageRetrieved.Unwrap() was expected but wasn't found");
}

class Context : ScenarioContext
{
public bool GotMessage { get; set; }
public bool MessageMovedToErrorQueue { get; set; }
}

class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
EndpointSetup<DefaultServer>(config =>
{
config.SendFailedMessagesTo(AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(ErrorSpy)));
config.UseSerialization<NewtonsoftSerializer>();
config.LimitMessageProcessingConcurrencyTo(1);
var transport = config.ConfigureAsqTransport();
transport.UnwrapMessagesWith(message => throw new Exception("Custom unwrapper failed"));
transport.DelayedDelivery().DisableDelayedDelivery();
});
}
}

class ErrorSpy : EndpointConfigurationBuilder
{
public ErrorSpy()
{
EndpointSetup<DefaultServer>(config =>
{
var transport = config.ConfigureAsqTransport();
config.UseSerialization<NewtonsoftSerializer>();
transport.DelayedDelivery().DisableDelayedDelivery();
});
}

class MyMessageHandler : IHandleMessages<MyMessage>
{
readonly Context testContext;

public MyMessageHandler(Context testContext)
{
this.testContext = testContext;
}

public Task Handle(MyMessage message, IMessageHandlerContext context)
{
if (message.Id == testContext.TestRunId)
{
testContext.MessageMovedToErrorQueue = true;
}

return Task.FromResult(0);
}
}
}

public class MyMessage : IMessage
{
public Guid Id { get; set; }
}
}
}
2 changes: 1 addition & 1 deletion src/AcceptanceTests/Receiving/When_receiving_a_message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Receiver()
{
EndpointSetup<DefaultServer>(config =>
{
config.UseTransport<AzureStorageQueueTransport>()
config.ConfigureAsqTransport()
.Transactions(TransportTransactionMode.ReceiveOnly)
.MessageInvisibleTime(VisibilityTimeout);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public async Task ShouldConsumeIt()
b.CustomConfig((cfg, context) =>
{
cfg.UseSerialization<NewtonsoftSerializer>();
cfg.UseTransport<AzureStorageQueueTransport>()
cfg.ConfigureAsqTransport()
.UnwrapMessagesWith(message => MyCustomUnwrapper(message, context.TestRunId));
});

Expand Down
2 changes: 1 addition & 1 deletion src/Transport/FodyWeavers.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<Weavers>
<Weavers GenerateXsd="false">
<Obsolete />
<Janitor />
</Weavers>
12 changes: 9 additions & 3 deletions src/Transport/MessageRetrieved.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@ public async Task<MessageWrapper> Unwrap()
}
catch (Exception ex)
{
// When a CloudQueueMessage is retrieved and is en-queued directly, message's ID and PopReceipt are mutated.
// To be able to delete the original message, original message ID and PopReceipt have to be stored aside.

var messageId = rawMessage.Id;
var messagePopReceipt = rawMessage.PopReceipt;

await errorQueue.AddMessageAsync(rawMessage).ConfigureAwait(false);
await inputQueue.DeleteMessageAsync(rawMessage).ConfigureAwait(false);
await inputQueue.DeleteMessageAsync(messageId, messagePopReceipt).ConfigureAwait(false);

throw new SerializationException($"Failed to deserialize message envelope for message with id {rawMessage.Id}. Make sure the configured serializer is used across all endpoints or configure the message wrapper serializer for this endpoint using the `SerializeMessageWrapperWith` extension on the transport configuration. Please refer to the Azure Storage Queue Transport configuration documentation for more details.", ex);
throw new SerializationException($"Failed to deserialize message envelope for message with id {messageId}. Make sure the configured serializer is used across all endpoints or configure the message wrapper serializer for this endpoint using the `SerializeMessageWrapperWith` extension on the transport configuration. Please refer to the Azure Storage Queue Transport configuration documentation for more details.", ex);
}
}

Expand Down Expand Up @@ -96,4 +102,4 @@ public LeaseTimeoutException(CloudQueueMessage rawMessage, TimeSpan visibilityTi
{
}
}
}
}

0 comments on commit cc757e1

Please sign in to comment.