Skip to content

Commit

Permalink
Backport msg size limit 11.0.1 (#1022)
Browse files Browse the repository at this point in the history
* Mention Azure Storage Explorer

* Add Azurite gitignore to src

* Make sure messages close to the size limit can be moved as is to the error queue


---------

Co-authored-by: Bob Langley <[email protected]>
  • Loading branch information
soujay and boblangley authored Jun 5, 2023
1 parent 58638b7 commit e912dcd
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 30 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ The Azure Storage Queues transport for NServiceBus enables the use of the Azure
## How to test locally

To run the tests locally, add a new environment variable `AzureStorageQueueTransport_ConnectionString` containing a connection string to your Azure storage account or the connection string `UseDevelopmentStorage=true` to use the [Azurite emulator](https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azurite) (ensure it is started before you run the tests).

Additionally, [Microsoft Azure Storage Explorer](https://azure.microsoft.com/en-us/products/storage/storage-explorer) is an useful free tool that can allow you to view and manage the contents of the Azurite emulator as well as Azure Storage accounts in the cloud.
13 changes: 13 additions & 0 deletions src/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Azurite queue
__queuestorage__
__azurite_db_queue__.json
__azurite_db_queue_extent__.json

# Azurite blob
__blobstorage__
__azurite_db_blob__.json
__azurite_db_blob_extent__.json

# Azurite table
__azurite_db_table__.json
__azurite_db_table_extent__.json
179 changes: 179 additions & 0 deletions src/AcceptanceTests/Receiving/When_receiving_large_message.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
namespace NServiceBus.Transport.AzureStorageQueues.AcceptanceTests
{
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using Azure.Transports.WindowsAzureStorageQueues;
using global::Azure.Storage.Queues;
using global::Newtonsoft.Json;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Faults;
using NUnit.Framework;

public class When_receiving_large_message : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_consume_it_without_the_error_headers_when_message_size_very_close_to_limit()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<Receiver>(b =>
{
b.When((bus, c) =>
{
var connectionString = Testing.Utilities.GetEnvConfiguredConnectionString();
var queueClient = new QueueClient(connectionString, "receivinglargemessage-receiver");

//This value is fine tuned to ensure adding the 2 error headers make the message too large
string contentCloseToLimits = new string('x', (35 * 1024) + 425);

var message = new MyMessage { SomeProperty = contentCloseToLimits, };

var messageSerialized = JsonConvert.SerializeObject(message, typeof(MyMessage), Formatting.Indented, new JsonSerializerSettings());

string id = Guid.NewGuid().ToString();
var wrapper = new MessageWrapper
{
Id = id,
Body = Encoding.UTF8.GetBytes(messageSerialized),
Headers = new Dictionary<string, string>
{
{ Headers.EnclosedMessageTypes, $"{typeof(MyMessage).AssemblyQualifiedName}" },
{ Headers.MessageId, id },
{ Headers.CorrelationId, id },
{TestIndependence.HeaderName, c.TestRunId.ToString()}
}
};

var wrapperSerialized = JsonConvert.SerializeObject(wrapper, typeof(MessageWrapper), Formatting.Indented, new JsonSerializerSettings());

var base64Encoded = Convert.ToBase64String(Encoding.UTF8.GetBytes(wrapperSerialized));

return queueClient.SendMessageAsync(base64Encoded);
}).DoNotFailOnErrorMessages();
})
.WithEndpoint<ErrorSpy>()
.Done(c => c.MessageMovedToTheErrorQueue)
.Run();

Assert.IsFalse(ctx.IsFailedQHeaderPresent, "IsFailedQHeaderPresent");
Assert.IsFalse(ctx.IsExceptionTypeHeaderPresent, "IsExceptionTypeHeaderPresent");
}

[Test]
public async Task Should_consume_it_with_only_two_error_headers_when_message_size_close_to_limit()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<Receiver>(b =>
{
b.When((bus, c) =>
{
var connectionString = Testing.Utilities.GetEnvConfiguredConnectionString();
var queueClient = new QueueClient(connectionString, "receivinglargemessage-receiver");

string contentCloseToLimits = new string('x', (35 * 1024) + 400);

var message = new MyMessage { SomeProperty = contentCloseToLimits, };

var messageSerialized = JsonConvert.SerializeObject(message, typeof(MyMessage), Formatting.Indented, new JsonSerializerSettings());

string id = Guid.NewGuid().ToString();
var wrapper = new MessageWrapper
{
Id = id,
Body = Encoding.UTF8.GetBytes(messageSerialized),
Headers = new Dictionary<string, string>
{
{ Headers.EnclosedMessageTypes, $"{typeof(MyMessage).AssemblyQualifiedName}" },
{ Headers.MessageId, id },
{ Headers.CorrelationId, id },
{TestIndependence.HeaderName, c.TestRunId.ToString()}
}
};

var wrapperSerialized = JsonConvert.SerializeObject(wrapper, typeof(MessageWrapper), Formatting.Indented, new JsonSerializerSettings());

var base64Encoded = Convert.ToBase64String(Encoding.UTF8.GetBytes(wrapperSerialized));

return queueClient.SendMessageAsync(base64Encoded);
}).DoNotFailOnErrorMessages();
})
.WithEndpoint<ErrorSpy>()
.Done(c => c.MessageMovedToTheErrorQueue)
.Run();

Assert.IsTrue(ctx.IsFailedQHeaderPresent, "IsFailedQHeaderPresent");
Assert.IsTrue(ctx.IsExceptionTypeHeaderPresent, "IsExceptionTypeHeaderPresent");
}

class Context : ScenarioContext
{
public bool MessageMovedToTheErrorQueue { get; set; }
public bool IsFailedQHeaderPresent { get; set; }
public bool IsExceptionTypeHeaderPresent { get; set; }

}

class Receiver : EndpointConfigurationBuilder
{
public Receiver() => EndpointSetup<DefaultServer>(c =>
{
c.UseSerialization<NewtonsoftJsonSerializer>();
c.SendFailedMessagesTo(Conventions.EndpointNamingConvention(typeof(ErrorSpy)));
});

public class MyHandler : IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
throw new InvalidOperationException();
}
}
}

class ErrorSpy : EndpointConfigurationBuilder
{
public ErrorSpy() => EndpointSetup<DefaultServer>(config =>
{
config.UseSerialization<NewtonsoftJsonSerializer>();
config.LimitMessageProcessingConcurrencyTo(1);
});

class MyMessageHandler : IHandleMessages<MyMessage>
{
public MyMessageHandler(Context testContext) => this.testContext = testContext;

public Task Handle(MyMessage message, IMessageHandlerContext context)
{
if (context.MessageHeaders.TryGetValue(TestIndependence.HeaderName, out var testRunId)
&& testRunId == testContext.TestRunId.ToString())
{
testContext.MessageMovedToTheErrorQueue = true;
}
if (context.MessageHeaders.ContainsKey(FaultsHeaderKeys.FailedQ)
&& testRunId == testContext.TestRunId.ToString())
{
testContext.IsFailedQHeaderPresent = true;
}
if (context.MessageHeaders.ContainsKey("NServiceBus.ExceptionInfo.ExceptionType")
&& testRunId == testContext.TestRunId.ToString())
{
testContext.IsExceptionTypeHeaderPresent = true;
}

return Task.CompletedTask;
}

readonly Context testContext;
}
}

public class MyMessage : IMessage
{
public string SomeProperty { get; set; }
}
}
}
12 changes: 11 additions & 1 deletion src/Transport/AtLeastOnceReceiveStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ namespace NServiceBus.Transport.AzureStorageQueues
using System.Threading.Tasks;
using Azure.Transports.WindowsAzureStorageQueues;
using Extensibility;
using global::Azure;
using Logging;
using Transport;

/// <summary>
/// This corresponds to the RecieveOnly transport transaction mode
/// </summary>
class AtLeastOnceReceiveStrategy : ReceiveStrategy
{
public AtLeastOnceReceiveStrategy(OnMessage onMessage, OnError onError, Action<string, Exception, CancellationToken> criticalErrorAction)
Expand All @@ -21,7 +25,7 @@ public AtLeastOnceReceiveStrategy(OnMessage onMessage, OnError onError, Action<s
public override async Task Receive(MessageRetrieved retrieved, MessageWrapper message, string receiveAddress, CancellationToken cancellationToken = default)
{
Logger.DebugFormat("Pushing received message (ID: '{0}') through pipeline.", message.Id);
var body = message.Body ?? new byte[0];
var body = message.Body ?? Array.Empty<byte>();
var contextBag = new ContextBag();
try
{
Expand Down Expand Up @@ -57,6 +61,12 @@ public override async Task Receive(MessageRetrieved retrieved, MessageWrapper me
await retrieved.Ack(cancellationToken).ConfigureAwait(false);
}
}
catch (RequestFailedException e) when (e.Status == 413 && e.ErrorCode == "RequestBodyTooLarge")
{
Logger.WarnFormat($"Message with native ID `{message.Id}` could not be moved to the error queue with additional headers because it was too large. Moving to the error queue as is.", e);

await retrieved.MoveToErrorQueueWithMinimalFaultHeaders(context, cancellationToken).ConfigureAwait(false);
}
catch (Exception onErrorEx) when (!onErrorEx.IsCausedBy(cancellationToken))
{
criticalErrorAction($"Failed to execute recoverability policy for message with native ID: `{message.Id}`", onErrorEx, cancellationToken);
Expand Down
12 changes: 9 additions & 3 deletions src/Transport/AtMostOnceReceiveStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace NServiceBus.Transport.AzureStorageQueues
using System.Threading.Tasks;
using Azure.Transports.WindowsAzureStorageQueues;
using Extensibility;
using global::Azure;
using Logging;
using Transport;

Expand All @@ -26,7 +27,7 @@ public override async Task Receive(MessageRetrieved retrieved, MessageWrapper me
{
Logger.DebugFormat("Pushing received message (ID: '{0}') through pipeline.", message.Id);
await retrieved.Ack(cancellationToken).ConfigureAwait(false);
var body = message.Body ?? new byte[0];
var body = message.Body ?? Array.Empty<byte>();
var contextBag = new ContextBag();
try
{
Expand All @@ -36,14 +37,19 @@ public override async Task Receive(MessageRetrieved retrieved, MessageWrapper me
catch (Exception ex) when (!ex.IsCausedBy(cancellationToken))
{
Logger.Warn("Azure Storage Queue transport failed pushing a message through pipeline", ex);

var context = CreateErrorContext(retrieved, message, ex, body, receiveAddress, contextBag);
try
{
var context = CreateErrorContext(retrieved, message, ex, body, receiveAddress, contextBag);
// Since this is TransportTransactionMode.None, we really don't care what the result is,
// we only need to know whether to call criticalErrorAction or not
_ = await onError(context, cancellationToken).ConfigureAwait(false);
}
catch (RequestFailedException e) when (e.Status == 413 && e.ErrorCode == "RequestBodyTooLarge")
{
Logger.WarnFormat($"Message with native ID `{message.Id}` could not be moved to the error queue with additional headers because it was too large. Moving to the error queue as is.", e);

await retrieved.MoveToErrorQueueWithMinimalFaultHeaders(context, cancellationToken).ConfigureAwait(false);
}
catch (Exception onErrorEx) when (!onErrorEx.IsCausedBy(cancellationToken))
{
criticalError($"Failed to execute recoverability policy for message with native ID: `{message.Id}`", onErrorEx, cancellationToken);
Expand Down
7 changes: 4 additions & 3 deletions src/Transport/AzureMessageQueueReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ namespace NServiceBus.Transport.AzureStorageQueues

class AzureMessageQueueReceiver
{
public AzureMessageQueueReceiver(IMessageEnvelopeUnwrapper unwrapper, IQueueServiceClientProvider queueServiceClientProvider, QueueAddressGenerator addressGenerator, bool purgeOnStartup, TimeSpan messageInvisibleTime)
public AzureMessageQueueReceiver(IMessageEnvelopeUnwrapper unwrapper, IQueueServiceClientProvider queueServiceClientProvider, QueueAddressGenerator addressGenerator, MessageWrapperSerializer serializer, bool purgeOnStartup, TimeSpan messageInvisibleTime)
{
this.unwrapper = unwrapper;
queueServiceClient = queueServiceClientProvider.Client;
this.addressGenerator = addressGenerator;
this.serializer = serializer;
PurgeOnStartup = purgeOnStartup;
MessageInvisibleTime = messageInvisibleTime;
}
Expand Down Expand Up @@ -56,7 +57,7 @@ internal async Task Receive(int batchSize, List<MessageRetrieved> receivedMessag

foreach (var rawMessage in rawMessages)
{
receivedMessages.Add(new MessageRetrieved(unwrapper, rawMessage, inputQueue, errorQueue));
receivedMessages.Add(new MessageRetrieved(unwrapper, serializer, rawMessage, inputQueue, errorQueue));
}

await backoffStrategy.OnBatch(receivedMessages.Count, cancellationToken).ConfigureAwait(false);
Expand All @@ -65,7 +66,7 @@ internal async Task Receive(int batchSize, List<MessageRetrieved> receivedMessag
IMessageEnvelopeUnwrapper unwrapper;

QueueAddressGenerator addressGenerator;

MessageWrapperSerializer serializer;
QueueClient inputQueue;
QueueClient errorQueue;
QueueServiceClient queueServiceClient;
Expand Down
2 changes: 1 addition & 1 deletion src/Transport/AzureStorageQueueTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ internal static IMessageSerializer GetMainSerializerHack(IMessageMapper mapper,

var subscriptionManager = new SubscriptionManager(subscriptionStore, hostSettings.Name, receiveAddress);

var receiver = new AzureMessageQueueReceiver(unwrapper, queueServiceClientProvider, GetQueueAddressGenerator(), receiveSettings.PurgeOnStartup, MessageInvisibleTime);
var receiver = new AzureMessageQueueReceiver(unwrapper, queueServiceClientProvider, GetQueueAddressGenerator(), serializer, receiveSettings.PurgeOnStartup, MessageInvisibleTime);

return (receiveSettings.Id, new MessageReceiver(
receiveSettings.Id,
Expand Down
12 changes: 2 additions & 10 deletions src/Transport/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ namespace NServiceBus.Transport.AzureStorageQueues
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Transports.WindowsAzureStorageQueues;
using global::Azure.Storage.Queues;
using Logging;
using NServiceBus.AzureStorageQueues;
using NServiceBus.Transport.AzureStorageQueues.Utils;
using Transport;
using Unicast.Queuing;

Expand Down Expand Up @@ -124,15 +124,7 @@ public async Task Send(UnicastTransportOperation operation, CancellationToken ca

Task Send(MessageWrapper wrapper, QueueClient sendQueue, TimeSpan timeToBeReceived, CancellationToken cancellationToken)
{
string base64String;

using (var stream = new MemoryStream())
{
serializer.Serialize(wrapper, stream);

var bytes = stream.ToArray();
base64String = Convert.ToBase64String(bytes);
}
string base64String = MessageWrapperHelper.ConvertToBase64String(wrapper, serializer);

return sendQueue.SendMessageAsync(base64String, timeToLive: timeToBeReceived, cancellationToken: cancellationToken);
}
Expand Down
Loading

0 comments on commit e912dcd

Please sign in to comment.