Skip to content

Commit

Permalink
Unwrap the OutboxMessage before publishing it
Browse files Browse the repository at this point in the history
  • Loading branch information
jakzal committed Sep 24, 2024
1 parent a67dd47 commit da7c1fd
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
18 changes: 18 additions & 0 deletions Libraries/CO.CDP.AwsServices.Tests/Sqs/SqsPublisherTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Amazon.SQS.Model;
using CO.CDP.AwsServices.Sqs;
using CO.CDP.MQ;
using CO.CDP.MQ.Outbox;
using CO.CDP.MQ.Tests;
using FluentAssertions;
using Microsoft.Extensions.Logging;
Expand All @@ -22,6 +23,22 @@ public SqsPublisherTest(LocalStackFixture localStack)
_sqsClient = SqsClient();
}

[Fact]
public async Task ItPublishesOutboxMessageToTheQueue()
{
var publisher = await CreatePublisher();

await publisher.Publish(new OutboxMessage
{
Type = "TestMessage",
Message = "{\"Id\":13,\"Name\":\"Hello!\"}"
});

var message = await waitForOneMessage<TestMessage>();

message.Should().Be(new TestMessage(13, "Hello!"));
}

protected override async Task<T> waitForOneMessage<T>() where T : class
{
var queue = await _sqsClient.GetQueueUrlAsync(TestQueue);
Expand All @@ -34,6 +51,7 @@ protected override async Task<T> waitForOneMessage<T>() where T : class
var message = messages.Messages.First<Message>();
var type = message.MessageAttributes.GetValueOrDefault("Type")?.StringValue;
type.Should().Be(typeof(T).Name);
await _sqsClient.DeleteMessageAsync(queue.QueueUrl, message.ReceiptHandle);
return JsonSerializer.Deserialize<T>(message.Body) ??
throw new Exception($"Unable to deserialize {message.Body} into {typeof(T).FullName}");
}
Expand Down
13 changes: 5 additions & 8 deletions Libraries/CO.CDP.AwsServices/Sqs/SqsPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@
using Amazon.SQS;
using Amazon.SQS.Model;
using CO.CDP.MQ;
using CO.CDP.MQ.Outbox;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace CO.CDP.AwsServices.Sqs;

public delegate string Serializer(object message);

public delegate string TypeMapper(object message);

public class SqsPublisher(
IAmazonSQS sqsClient,
SqsPublisherConfiguration configuration,
Serializer serializer,
TypeMapper typeMapper,
Func<object, string> serializer,
Func<object, string> typeMapper,
ILogger<SqsPublisher> logger
) : IPublisher
{
Expand All @@ -29,8 +26,8 @@ public SqsPublisher(IAmazonSQS sqsClient, IOptions<AwsConfiguration> configurati
public SqsPublisher(IAmazonSQS sqsClient, SqsPublisherConfiguration configuration, ILogger<SqsPublisher> logger) : this(
sqsClient,
configuration,
o => JsonSerializer.Serialize(o),
o => o.GetType().Name,
OutboxMessageSerializerFactory.Create(o => JsonSerializer.Serialize(o)),
OutboxMessageTypeMapperFactory.Create(o => o.GetType().Name),
logger)
{
}
Expand Down

0 comments on commit da7c1fd

Please sign in to comment.