Skip to content

Commit

Permalink
Merge pull request #639 from cabinetoffice/feature/DP-396-outbox-wiring
Browse files Browse the repository at this point in the history
DP-396: Plug in the Message Outbox
  • Loading branch information
jakzal authored Oct 1, 2024
2 parents 3e29fa6 + da7c1fd commit af78e41
Show file tree
Hide file tree
Showing 23 changed files with 2,458 additions and 51 deletions.
18 changes: 18 additions & 0 deletions Libraries/CO.CDP.AwsServices.Tests/Sqs/SqsPublisherTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Amazon.SQS.Model;
using CO.CDP.AwsServices.Sqs;
using CO.CDP.MQ;
using CO.CDP.MQ.Outbox;
using CO.CDP.MQ.Tests;
using FluentAssertions;
using Microsoft.Extensions.Logging;
Expand All @@ -22,6 +23,22 @@ public SqsPublisherTest(LocalStackFixture localStack)
_sqsClient = SqsClient();
}

[Fact]
public async Task ItPublishesOutboxMessageToTheQueue()
{
var publisher = await CreatePublisher();

await publisher.Publish(new OutboxMessage
{
Type = "TestMessage",
Message = "{\"Id\":13,\"Name\":\"Hello!\"}"
});

var message = await waitForOneMessage<TestMessage>();

message.Should().Be(new TestMessage(13, "Hello!"));
}

protected override async Task<T> waitForOneMessage<T>() where T : class
{
var queue = await _sqsClient.GetQueueUrlAsync(TestQueue);
Expand All @@ -34,6 +51,7 @@ protected override async Task<T> waitForOneMessage<T>() where T : class
var message = messages.Messages.First<Message>();
var type = message.MessageAttributes.GetValueOrDefault("Type")?.StringValue;
type.Should().Be(typeof(T).Name);
await _sqsClient.DeleteMessageAsync(queue.QueueUrl, message.ReceiptHandle);
return JsonSerializer.Deserialize<T>(message.Body) ??
throw new Exception($"Unable to deserialize {message.Body} into {typeof(T).FullName}");
}
Expand Down
3 changes: 3 additions & 0 deletions Libraries/CO.CDP.AwsServices/AwsConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using static CO.CDP.MQ.Hosting.OutboxProcessorBackgroundService;

namespace CO.CDP.AwsServices;

public record AwsConfiguration
Expand Down Expand Up @@ -40,4 +42,5 @@ public record SqsPublisherConfiguration
{
public required string QueueUrl { get; init; }
public required string? MessageGroupId { get; init; }
public OutboxProcessorConfiguration? Outbox { get; init; }
}
25 changes: 23 additions & 2 deletions Libraries/CO.CDP.AwsServices/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
using CO.CDP.AwsServices.S3;
using CO.CDP.AwsServices.Sqs;
using CO.CDP.MQ;
using CO.CDP.MQ.Hosting;
using CO.CDP.MQ.Outbox;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -70,9 +73,27 @@ public static IServiceCollection AddAwsSqsService(this IServiceCollection servic
.AddAWSService<IAmazonSQS>();
}

public static IServiceCollection AddSqsPublisher(this IServiceCollection services)
public static IServiceCollection AddOutboxSqsPublisher<TDbContext>(this IServiceCollection services)
where TDbContext : DbContext, IOutboxMessageDbContext
{
return services.AddScoped<IPublisher, SqsPublisher>();
services.AddScoped<IOutboxMessageRepository, DatabaseOutboxMessageRepository<TDbContext>>();

services.AddKeyedScoped<IPublisher, SqsPublisher>("SqsPublisher");
services.AddScoped<IPublisher, OutboxMessagePublisher>();

services.AddSingleton<OutboxProcessorBackgroundService.OutboxProcessorConfiguration>(s =>
s.GetRequiredService<IOptions<AwsConfiguration>>().Value.SqsPublisher?.Outbox ??
new OutboxProcessorBackgroundService.OutboxProcessorConfiguration()
);

services.AddScoped<IOutboxProcessor>(s =>
new OutboxProcessor(
s.GetRequiredKeyedService<IPublisher>("SqsPublisher"),
s.GetRequiredService<IOutboxMessageRepository>(),
s.GetRequiredService<ILogger<OutboxProcessor>>()
)
);
return services;
}

public static IServiceCollection AddSqsDispatcher(this IServiceCollection services,
Expand Down
13 changes: 5 additions & 8 deletions Libraries/CO.CDP.AwsServices/Sqs/SqsPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@
using Amazon.SQS;
using Amazon.SQS.Model;
using CO.CDP.MQ;
using CO.CDP.MQ.Outbox;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace CO.CDP.AwsServices.Sqs;

public delegate string Serializer(object message);

public delegate string TypeMapper(object message);

public class SqsPublisher(
IAmazonSQS sqsClient,
SqsPublisherConfiguration configuration,
Serializer serializer,
TypeMapper typeMapper,
Func<object, string> serializer,
Func<object, string> typeMapper,
ILogger<SqsPublisher> logger
) : IPublisher
{
Expand All @@ -29,8 +26,8 @@ public SqsPublisher(IAmazonSQS sqsClient, IOptions<AwsConfiguration> configurati
public SqsPublisher(IAmazonSQS sqsClient, SqsPublisherConfiguration configuration, ILogger<SqsPublisher> logger) : this(
sqsClient,
configuration,
o => JsonSerializer.Serialize(o),
o => o.GetType().Name,
OutboxMessageSerializerFactory.Create(o => JsonSerializer.Serialize(o)),
OutboxMessageTypeMapperFactory.Create(o => o.GetType().Name),
logger)
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using CO.CDP.MQ.Outbox;
using CO.CDP.MQ.Tests.Hosting.TestKit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Moq;

namespace CO.CDP.MQ.Tests.Hosting;
Expand All @@ -11,6 +12,9 @@ public class OutboxProcessorBackgroundServiceTest
private readonly Mock<IOutboxProcessor> _outboxProcessor = new();
private readonly TestServiceProvider _serviceProvider = new();

private readonly ILogger<OutboxProcessorBackgroundService> _logger =
LoggerFactory.Create(_ => { }).CreateLogger<OutboxProcessorBackgroundService>();

[Fact]
public async Task ItExecutesTheOutboxProcessor()
{
Expand All @@ -22,7 +26,8 @@ public async Task ItExecutesTheOutboxProcessor()
{
BatchSize = 2,
ExecutionInterval = TimeSpan.FromSeconds(30)
});
},
_logger);
await backgroundService.StartAsync(CancellationToken.None);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await backgroundService.StopAsync(CancellationToken.None);
Expand All @@ -41,7 +46,8 @@ public async Task ItContinuesExecutingTheOutboxProcessorInRegularIntervals()
{
BatchSize = 3,
ExecutionInterval = TimeSpan.FromMilliseconds(4)
});
},
_logger);
await backgroundService.StartAsync(CancellationToken.None);
await Task.Delay(TimeSpan.FromMilliseconds(8));
await backgroundService.StopAsync(CancellationToken.None);
Expand Down
43 changes: 25 additions & 18 deletions Libraries/CO.CDP.MQ/Hosting/OutboxProcessorBackgroundService.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,47 @@
using CO.CDP.MQ.Outbox;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace CO.CDP.MQ.Hosting;

public class OutboxProcessorBackgroundService(
IServiceProvider services,
OutboxProcessorBackgroundService.OutboxProcessorConfiguration configuration
) : IHostedService, IDisposable
OutboxProcessorBackgroundService.OutboxProcessorConfiguration configuration,
ILogger<OutboxProcessorBackgroundService> logger
) : BackgroundService
{
public record OutboxProcessorConfiguration
{
public int BatchSize { get; init; } = 10;
public TimeSpan ExecutionInterval { get; init; } = TimeSpan.FromSeconds(60);
}

private Timer? _timer = null;

public Task StartAsync(CancellationToken cancellationToken)
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_timer = new Timer(ExecuteOutboxProcessorAsync, null, TimeSpan.Zero, configuration.ExecutionInterval);
return Task.CompletedTask;
}
logger.LogDebug(
"Staring the outbox processor background service Interval={INTERVAL} Batch={BATCH}",
configuration.ExecutionInterval, configuration.BatchSize);

public Task StopAsync(CancellationToken cancellationToken)
{
_timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
}
await ExecuteOutboxProcessorAsync();

using PeriodicTimer timer = new(configuration.ExecutionInterval);

try
{
while (await timer.WaitForNextTickAsync(stoppingToken))
{
await ExecuteOutboxProcessorAsync();
}
}
catch (OperationCanceledException)
{
logger.LogDebug("Stopping the outbox processor background service");
}

public void Dispose()
{
_timer?.Dispose();
}

private async void ExecuteOutboxProcessorAsync(object? state)
private async Task ExecuteOutboxProcessorAsync()
{
using var scope = services.CreateScope();
var outboxProcessor = scope.ServiceProvider.GetRequiredService<IOutboxProcessor>();
Expand Down
6 changes: 5 additions & 1 deletion Libraries/CO.CDP.MQ/Outbox/ModelBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ public static class ModelBuilderExtensions
public static ModelBuilder OnOutboxMessageCreating(this ModelBuilder modelBuilder)
{
modelBuilder.Entity<OutboxMessage>(om =>
om.Property(m => m.CreatedOn).HasTimestampDefault()
{
om.Property(m => m.CreatedOn).HasTimestampDefault();
om.HasIndex(m => m.CreatedOn);
om.HasIndex(m => m.Published);
}
);
return modelBuilder;
}
Expand Down
9 changes: 6 additions & 3 deletions Libraries/CO.CDP.MQ/Outbox/OutboxProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@ public async Task ExecuteAsync(int count)
{
logger.LogDebug("Executing the outbox processor");
var messages = await FetchMessages(count);
messages.ForEach(PublishMessage);
foreach (var outboxMessage in messages)
{
await PublishMessage(outboxMessage);
}
}

private async Task<List<OutboxMessage>> FetchMessages(int count)
{
var messages = await outbox.FindOldest(count);
logger.LogDebug("Fetched `{COUNT}` messages", messages.Count);
logger.LogDebug("Fetched {COUNT} message(s)", messages.Count);
return messages;
}

private async void PublishMessage(OutboxMessage m)
private async Task PublishMessage(OutboxMessage m)
{
logger.LogDebug("Publishing the `{TYPE}` message: `{MESSAGE}`", m.Type, m.Message);
await publisher.Publish(m);
Expand Down
Loading

0 comments on commit af78e41

Please sign in to comment.