Skip to content

Commit

Permalink
Merge pull request #1278 from Particular/1005_delayed_retry_r12
Browse files Browse the repository at this point in the history
Fix for custom unwrapper not handling delayed retries - 12.0
  • Loading branch information
jasontaylordev authored Jan 14, 2025
2 parents 9e98907 + 9b0c439 commit e46c4f6
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 9 deletions.
104 changes: 104 additions & 0 deletions src/AcceptanceTests/DelayedDelivery/Native_message_delayed_retry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
namespace NServiceBus.Transport.AzureStorageQueues.AcceptanceTests.DelayedDelivery
{
using System;
using System.Buffers.Text;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using global::Azure.Storage.Queues;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Azure.Transports.WindowsAzureStorageQueues;
using NUnit.Framework;
using Testing;

class Native_message_delayed_retry : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_handle_delayed_delivery_of_native_message()
{
var ctx = await RunScenario();

Assert.That(ctx.IsDone, Is.True);
}

Task<MyContext> RunScenario(CancellationToken cancellationToken = default) => Scenario.Define<MyContext>()
.WithEndpoint<SampleEndpoint>(endpoint => endpoint
.DoNotFailOnErrorMessages()
.When(async session =>
{
var nativeMessage = new NativeMessage
{
Content = $"Hello from native sender @ {DateTimeOffset.UtcNow}"
};

var queueClient = new QueueClient(Utilities.GetEnvConfiguredConnectionString(), "native-integration-asq");
await queueClient.CreateIfNotExistsAsync();

var serializedMessage = JsonSerializer.Serialize(nativeMessage);
await queueClient.SendMessageAsync(serializedMessage);
})
)
.Done(context => !cancellationToken.IsCancellationRequested && context.IsDone)
.Run();

class SampleEndpoint : EndpointConfigurationBuilder
{
public SampleEndpoint() =>
EndpointSetup<DefaultServer>(
cfg =>
{
cfg.Recoverability()
.Delayed(delayed => delayed.NumberOfRetries(1).TimeIncrease(TimeSpan.FromSeconds(1)))
.Immediate(immediate => immediate.NumberOfRetries(0));
cfg.UseSerialization<SystemJsonSerializer>();
var transport = cfg.ConfigureTransport<AzureStorageQueueTransport>();
transport.MessageUnwrapper = message =>
{
return Base64.DecodeFromUtf8InPlace(Encoding.UTF8.GetBytes(message.MessageText), out int _) == System.Buffers.OperationStatus.Done
? null
: new MessageWrapper
{
Id = message.MessageId,
Body = message.Body.ToArray(),
Headers = new Dictionary<string, string>
{
{ Headers.EnclosedMessageTypes, typeof(NativeMessage).FullName },
{ TestIndependence.HeaderName, ScenarioContext.TestRunId.ToString() }
}
};
};
})
.CustomEndpointName("native-integration-asq");

class MyMessageHandler : IHandleMessages<NativeMessage>
{
MyContext scenarioContext;

public MyMessageHandler(MyContext scenarioContext) => this.scenarioContext = scenarioContext;

public Task Handle(NativeMessage message, IMessageHandlerContext context)
{
if (context.MessageHeaders.TryGetValue(Headers.DelayedRetries, out string value) && value == "1")
{
scenarioContext.IsDone = true;
}
throw new Exception("Failing over to delay retry");
}
}
}

class NativeMessage : IMessage
{
public string Content { get; set; }
}

class MyContext : ScenarioContext
{
public bool IsDone { get; set; }
}
}
}
5 changes: 3 additions & 2 deletions src/Transport/AzureStorageQueueTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,10 @@ internal static IMessageSerializer GetMainSerializerHack(IMessageMapper mapper,
MessageWrapperSerializer serializer,
Action<string, Exception, CancellationToken> criticalErrorAction, ISubscriptionStore subscriptionStore)
{
var defaultUnwrapper = new DefaultMessageEnvelopeUnwrapper(serializer);
var unwrapper = MessageUnwrapper != null
? (IMessageEnvelopeUnwrapper)new UserProvidedEnvelopeUnwrapper(MessageUnwrapper)
: new DefaultMessageEnvelopeUnwrapper(serializer);
? (IMessageEnvelopeUnwrapper)new UserProvidedEnvelopeUnwrapper(MessageUnwrapper, defaultUnwrapper)
: defaultUnwrapper;

var receiveAddress = AzureStorageQueueInfrastructure.TranslateAddress(receiveSettings.ReceiveAddress, QueueAddressGenerator);

Expand Down
12 changes: 5 additions & 7 deletions src/Transport/UserProvidedEnvelopeUnwrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ namespace NServiceBus.Transport.AzureStorageQueues

class UserProvidedEnvelopeUnwrapper : IMessageEnvelopeUnwrapper
{
public UserProvidedEnvelopeUnwrapper(Func<QueueMessage, MessageWrapper> unwrapper)
public UserProvidedEnvelopeUnwrapper(Func<QueueMessage, MessageWrapper> unwrapper, DefaultMessageEnvelopeUnwrapper defaultUnwrapper)
{
this.unwrapper = unwrapper;
this.defaultUnwrapper = defaultUnwrapper;
}

public MessageWrapper Unwrap(QueueMessage rawMessage)
{

return unwrapper(rawMessage);
}
public MessageWrapper Unwrap(QueueMessage rawMessage) => unwrapper(rawMessage) ?? defaultUnwrapper.Unwrap(rawMessage);

Func<QueueMessage, MessageWrapper> unwrapper;
readonly Func<QueueMessage, MessageWrapper> unwrapper;
readonly DefaultMessageEnvelopeUnwrapper defaultUnwrapper;
}
}

0 comments on commit e46c4f6

Please sign in to comment.