diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/Sending/When_sending_control_messages_without_body.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/Sending/When_sending_control_messages_without_body.cs index 2a5fc50e9..6b4807493 100644 --- a/src/NServiceBus.Transport.SQS.AcceptanceTests/Sending/When_sending_control_messages_without_body.cs +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/Sending/When_sending_control_messages_without_body.cs @@ -33,39 +33,22 @@ public async Task Can_be_sent_and_processed() class Sender : EndpointConfigurationBuilder { - public Sender() - { - EndpointSetup(cfg => cfg.ConfigureSqsTransport().DoNotWrapOutgoingMessages = true); - } + public Sender() => EndpointSetup(cfg => cfg.ConfigureSqsTransport().DoNotWrapOutgoingMessages = true); class DispatchControlMessageAtStartup : Feature { - public DispatchControlMessageAtStartup() - { - EnableByDefault(); - } + public DispatchControlMessageAtStartup() => EnableByDefault(); - protected override void Setup(FeatureConfigurationContext context) - { + protected override void Setup(FeatureConfigurationContext context) => context.RegisterStartupTask(sp => new Startup( sp.GetRequiredService(), sp.GetRequiredService()) ); - } - class Startup : FeatureStartupTask + class Startup(IMessageDispatcher dispatcher, MyContext context) : FeatureStartupTask { - readonly IMessageDispatcher dispatcher; - readonly MyContext context; - - public Startup(IMessageDispatcher dispatcher, MyContext context) - { - this.dispatcher = dispatcher; - this.context = context; - } - protected override Task OnStart(IMessageSession session, - CancellationToken cancellationToken = new CancellationToken()) + CancellationToken cancellationToken = default) { var transportOperations = new TransportOperations( new TransportOperation( @@ -86,57 +69,27 @@ protected override Task OnStart(IMessageSession session, } protected override Task OnStop(IMessageSession session, - CancellationToken cancellationToken = new CancellationToken()) => Task.CompletedTask; + CancellationToken cancellationToken = default) => Task.CompletedTask; } } } class Receiver : EndpointConfigurationBuilder { - public Receiver() - { - EndpointSetup(); - } + public Receiver() => EndpointSetup(c => c.Pipeline.Register("CatchControlMessage", typeof(CatchControlMessageBehavior), "Catches control message")); - public class DoTheThing : Feature + class CatchControlMessageBehavior(MyContext myContext) : Behavior { - public DoTheThing() + public override Task Invoke(IIncomingPhysicalMessageContext context, Func next) { - EnableByDefault(); - } - - protected override void Setup(FeatureConfigurationContext context) - { - context.Pipeline.Register(); - } - - class PipelineBehavior : Behavior - { - readonly MyContext myContext; - - public PipelineBehavior(MyContext myContext) + if (context.MessageHeaders.ContainsKey("MyControlMessage")) { - this.myContext = myContext; + myContext.ControlMessageBodyLength = context.Message.Body.Length; + myContext.ControlMessageReceived = true; + return Task.CompletedTask; } - public override Task Invoke(IIncomingPhysicalMessageContext context, Func next) - { - if (context.MessageHeaders.ContainsKey("MyControlMessage")) - { - myContext.ControlMessageBodyLength = context.Message.Body.Length; - myContext.ControlMessageReceived = true; - return Task.CompletedTask; - } - - return next(); - } - - public class Registration : RegisterStep - { - public Registration() : base("CatchControlMessage", typeof(PipelineBehavior), "Catch control message") - { - } - } + return next(); } } } diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/Sending/When_sending_messages_with_invalid_sqs_chars.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/Sending/When_sending_messages_with_invalid_sqs_chars.cs new file mode 100644 index 000000000..403eec5f3 --- /dev/null +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/Sending/When_sending_messages_with_invalid_sqs_chars.cs @@ -0,0 +1,143 @@ +namespace NServiceBus.AcceptanceTests.Sending +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + using AcceptanceTesting; + using EndpointTemplates; + using Features; + using Microsoft.Extensions.DependencyInjection; + using NServiceBus.Pipeline; + using NServiceBus.Routing; + using NUnit.Framework; + using Transport; + + class When_sending_messages_with_invalid_sqs_chars : NServiceBusAcceptanceTest + { + [Test] + public async Task Can_be_sent_and_processed() + { + var context = await Scenario.Define(ctx => + { + ctx.DestinationQueueName = TestNameHelper.GetSqsQueueName("SendingMessagesWithInvalidSqsChars.Receiver", SetupFixture.NamePrefix); + ctx.ControlMessageId = Guid.NewGuid().ToString(); + }) + .WithEndpoint() + .WithEndpoint() + .Done(ctx => ctx.ControlMessageReceived) + .Run(); + + Assert.That(context.ControlMessageBody, Is.Not.Empty); + } + + class Sender : EndpointConfigurationBuilder + { + public Sender() => EndpointSetup(cfg => cfg.ConfigureSqsTransport().DoNotWrapOutgoingMessages = true); + + class DispatchControlMessageAtStartup : Feature + { + public DispatchControlMessageAtStartup() => EnableByDefault(); + + protected override void Setup(FeatureConfigurationContext context) => + context.RegisterStartupTask(sp => new Startup( + sp.GetRequiredService(), + sp.GetRequiredService()) + ); + + class Startup(IMessageDispatcher dispatcher, MyContext context) : FeatureStartupTask + { + protected override Task OnStart(IMessageSession session, + CancellationToken cancellationToken = default) + { + var transportOperations = new TransportOperations( + new TransportOperation( + new OutgoingMessage( + context.ControlMessageId, + new Dictionary + { + [Headers.MessageId] = context.ControlMessageId + }, + CreateBodyWithDisallowedCharacters() + ), + new UnicastAddressTag(context.DestinationQueueName) + ) + ); + var transportTransaction = new TransportTransaction(); + return dispatcher.Dispatch(transportOperations, transportTransaction, cancellationToken); + } + + protected override Task OnStop(IMessageSession session, + CancellationToken cancellationToken = default) => Task.CompletedTask; + } + } + + // See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html + static byte[] CreateBodyWithDisallowedCharacters() + { + var disallowed = new List(16559); + + // Characters below #x9 + disallowed.AddRange(Enumerable.Range(0x0, 0x9)); + + // Characters between #xB and #xC + disallowed.AddRange(Enumerable.Range(0xB, 2)); // #xB, #xC + + // Characters between #xE and #x1F + disallowed.AddRange(Enumerable.Range(0xE, 0x20 - 0xE)); + + // Surrogate pairs (from #xD800 to #xDFFF) cannot be added because ConvertFromUtf32 throws + // disallowed.AddRange(Enumerable.Range(0xD800, 0xE000 - 0xD800)); + + // Characters greater than #x10FFFF + for (int i = 0x110000; i <= 0x10FFFF; i++) + { + disallowed.Add(i); + } + + var byteList = new List(disallowed.Count * 4); + foreach (var codePoint in disallowed) + { + if (codePoint <= 0x10FFFF) + { + string charAsString = char.ConvertFromUtf32(codePoint); + byte[] utf8Bytes = Encoding.UTF8.GetBytes(charAsString); + byteList.AddRange(utf8Bytes); + } + } + + return [.. byteList]; + } + } + + class Receiver : EndpointConfigurationBuilder + { + public Receiver() => EndpointSetup(c => c.Pipeline.Register("CatchControlMessage", typeof(CatchControlMessageBehavior), "Catches control message")); + + class CatchControlMessageBehavior(MyContext myContext) : Behavior + { + public override Task Invoke(IIncomingPhysicalMessageContext context, Func next) + { + if (context.MessageId == myContext.ControlMessageId) + { + myContext.ControlMessageBody = context.Message.Body.ToString(); + myContext.ControlMessageReceived = true; + return Task.CompletedTask; + } + + return next(); + } + } + } + + class MyContext : ScenarioContext + { + public string DestinationQueueName { get; set; } + public string ControlMessageId { get; set; } + public bool ControlMessageReceived { get; set; } + public string ControlMessageBody { get; set; } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages.csproj b/src/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages.csproj index 22e3a6195..76b2881ab 100644 --- a/src/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages.csproj +++ b/src/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages.csproj @@ -50,10 +50,4 @@ - - - - - - diff --git a/src/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages/Receiving_metrics_messages.cs b/src/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages/Receiving_metrics_messages.cs deleted file mode 100644 index df2a39dc3..000000000 --- a/src/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages/Receiving_metrics_messages.cs +++ /dev/null @@ -1,36 +0,0 @@ -namespace NServiceBus.TransportTests; - -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using NServiceBus; -using Transport; -using NUnit.Framework; - -public class Receiving_metrics_messages : NServiceBusTransportTest -{ - [TestCase(TransportTransactionMode.None)] - [TestCase(TransportTransactionMode.ReceiveOnly)] - public async Task Should_expose_receiving_address(TransportTransactionMode transactionMode) - { - var onError = CreateTaskCompletionSource(); - - await StartPump( - (_, _) => Task.CompletedTask, - (context, _) => - { - onError.SetResult(context); - return Task.FromResult(ErrorHandleResult.Handled); - }, - transactionMode); - - var headers = new Dictionary - { - { Transport.SQS.Constants.MetricsMessageMetricTypeHeaderKey, "doesn't matter" }, - { Headers.ContentType, Transport.SQS.Constants.MetricsMessageContentTypeHeaderValue } - }; - var body = Guid.NewGuid().ToByteArray(); - - await SendMessage(InputQueueName, headers, body: body); - } -} \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages/Sending_metrics_messages.cs b/src/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages/Sending_messages_with_invalid_sqs_chars.cs similarity index 51% rename from src/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages/Sending_metrics_messages.cs rename to src/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages/Sending_messages_with_invalid_sqs_chars.cs index bc1ee3cc0..37ece213f 100644 --- a/src/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages/Sending_metrics_messages.cs +++ b/src/NServiceBus.Transport.SQS.TransportTests.DoNotWrapOutgoingMessages/Sending_messages_with_invalid_sqs_chars.cs @@ -1,38 +1,47 @@ namespace NServiceBus.TransportTests { - using System; using System.Collections.Generic; using System.Threading.Tasks; using NUnit.Framework; using Transport; - public class Sending_metrics_messages : NServiceBusTransportTest + public class Sending_messages_with_invalid_sqs_chars : NServiceBusTransportTest { [TestCase(TransportTransactionMode.None)] [TestCase(TransportTransactionMode.ReceiveOnly)] - public async Task Should_not_fail_when_using_do_not_wrap( + public async Task Should_receive_message( TransportTransactionMode transactionMode) { var messageProcessed = CreateTaskCompletionSource(); + byte[] copyOfTheBody = null; await StartPump( - (context, _) => messageProcessed.SetCompleted(context), + (context, _) => + { + // This is crucial due to internal buffer pooling in SQS transport + copyOfTheBody = context.Body.ToArray(); + return messageProcessed.SetCompleted(context); + }, (_, __) => Task.FromResult(ErrorHandleResult.Handled), TransportTransactionMode.None); var headers = new Dictionary { - { Transport.SQS.Constants.MetricsMessageMetricTypeHeaderKey, "doesn't matter" }, - { Headers.ContentType, Transport.SQS.Constants.MetricsMessageContentTypeHeaderValue } + { "SomeHeader", "header value with invalid chars: \0" }, }; - var body = Guid.NewGuid().ToByteArray(); + + var body = "body with invalid chars: \0"u8.ToArray(); await SendMessage(InputQueueName, headers, body: body); var messageContext = await messageProcessed.Task; Assert.That(messageContext.Headers, Is.Not.Empty); - Assert.That(messageContext.Headers, Is.SupersetOf(headers)); + Assert.Multiple(() => + { + Assert.That(messageContext.Headers, Is.SupersetOf(headers)); + Assert.That(copyOfTheBody, Is.EquivalentTo(body)); + }); } } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs b/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs index ec96e6e18..36912a525 100644 --- a/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs +++ b/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs @@ -142,6 +142,8 @@ public Func TopicNameGenerator /// NServiceBus headers will be sent as an Amazon message attribute. /// Only turn this on if all your endpoints are version 6.1.0 or above. /// + /// In cases when the outgoing message contains characters that are not compliant with the W3C specification + /// for characters SQS requires the payload is base64 encoded automatically. public bool DoNotWrapOutgoingMessages { get; set; } /// diff --git a/src/NServiceBus.Transport.SQS/Constants.cs b/src/NServiceBus.Transport.SQS/Constants.cs deleted file mode 100644 index ea176c7a9..000000000 --- a/src/NServiceBus.Transport.SQS/Constants.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace NServiceBus.Transport.SQS; - -class Constants -{ - public const string MetricsMessageContentTypeHeaderValue = "TaggedLongValueWriterOccurrence"; - public const string MetricsMessageMetricTypeHeaderKey = "NServiceBus.Metric.Type"; -} \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS/Extensions/TransportMessageExtensions.cs b/src/NServiceBus.Transport.SQS/Extensions/TransportMessageExtensions.cs index 23e4786c3..b2c917e9b 100644 --- a/src/NServiceBus.Transport.SQS/Extensions/TransportMessageExtensions.cs +++ b/src/NServiceBus.Transport.SQS/Extensions/TransportMessageExtensions.cs @@ -22,10 +22,7 @@ static class TransportMessageExtensions } var isNativeMessage = transportMessage.Headers.Keys.Contains(TransportHeaders.Headers); - var isMetricsMessage = transportMessage.Headers.Keys.Contains(Constants.MetricsMessageMetricTypeHeaderKey) - && transportMessage.Headers.TryGetValue(Headers.ContentType, out var contentType) - && contentType == Constants.MetricsMessageContentTypeHeaderValue; - return ConvertBody(transportMessage.Body, arrayPool, isNativeMessage, isMetricsMessage); + return ConvertBody(transportMessage.Body, arrayPool, isNativeMessage); } if (s3Settings == null) @@ -51,11 +48,11 @@ static class TransportMessageExtensions return (buffer.AsMemory(0, contentLength), buffer); } - static (ReadOnlyMemory MessageBody, byte[]? MessageBodyBuffer) ConvertBody(string body, ArrayPool arrayPool, bool isNativeMessage, bool isMetricsMessage) + static (ReadOnlyMemory MessageBody, byte[]? MessageBodyBuffer) ConvertBody(string body, ArrayPool arrayPool, bool isNativeMessage) { var encoding = Encoding.UTF8; - if (isNativeMessage && !isMetricsMessage) + if (isNativeMessage) { return GetNonEncodedBody(body, arrayPool, null, encoding); } diff --git a/src/NServiceBus.Transport.SQS/MessageDispatcher.cs b/src/NServiceBus.Transport.SQS/MessageDispatcher.cs index 04489d42a..88437c894 100644 --- a/src/NServiceBus.Transport.SQS/MessageDispatcher.cs +++ b/src/NServiceBus.Transport.SQS/MessageDispatcher.cs @@ -7,6 +7,7 @@ namespace NServiceBus.Transport.SQS using System.Linq; using System.Text; using System.Text.Json; + using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Amazon.S3.Model; @@ -17,7 +18,7 @@ namespace NServiceBus.Transport.SQS using Settings; using Transport; - class MessageDispatcher : IMessageDispatcher + partial class MessageDispatcher : IMessageDispatcher { public MessageDispatcher(IReadOnlySettings settings, IAmazonSQS sqsClient, IAmazonSimpleNotificationService snsClient, @@ -463,12 +464,13 @@ long CalculateDelayedDeliverySeconds(UnicastTransportOperation transportOperatio // this could be a control message body = TransportMessage.EmptyMessage; } - else if (outgoingMessage.Headers.ContainsKey(Constants.MetricsMessageMetricTypeHeaderKey) - && outgoingMessage.Headers.TryGetValue(Headers.ContentType, out var contentType) - && contentType == Constants.MetricsMessageContentTypeHeaderValue) + else if (!wrapOutgoingMessages) { - // it's a message from the metrics package - body = Convert.ToBase64String(outgoingMessage.Body.Span); + body = Encoding.UTF8.GetString(outgoingMessage.Body.Span); + if (!ValidSqsCharacters().IsMatch(body)) + { + body = Convert.ToBase64String(outgoingMessage.Body.Span); + } } else { @@ -482,6 +484,9 @@ long CalculateDelayedDeliverySeconds(UnicastTransportOperation transportOperatio return (body, headers); } + [GeneratedRegex(@"^[\u0009\u000A\u000D\u0020-\uD7FF\uE000-\uFFFD\u10000-\u10FFFF]*$", RegexOptions.Singleline)] + private static partial Regex ValidSqsCharacters(); + async Task UploadToS3(string messageId, IOutgoingTransportOperation transportOperation, CancellationToken cancellationToken) { if (s3 == null)