From 2e781084efc45e5fc5d3809377b1b648e0feec3b Mon Sep 17 00:00:00 2001 From: James Gunn Date: Fri, 22 Nov 2024 09:22:46 +0000 Subject: [PATCH] Add a background service for delivering webhook messages (#1699) --- .../appsettings.Development.json | 5 +- .../Webhooks/ApplicationBuilderExtensions.cs | 12 + .../Services/Webhooks/IWebhookSender.cs | 8 + .../Webhooks/WebhookDeliveryService.cs | 139 +++++++++ .../Services/Webhooks/WebhookSender.cs | 2 +- .../TeachingRecordSystem.Worker/Program.cs | 4 +- .../Webhooks/WebhookDeliveryServiceTests.cs | 280 ++++++++++++++++++ 7 files changed, 444 insertions(+), 6 deletions(-) create mode 100644 TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/IWebhookSender.cs create mode 100644 TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/WebhookDeliveryService.cs create mode 100644 TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/Services/Webhooks/WebhookDeliveryServiceTests.cs diff --git a/TeachingRecordSystem/src/TeachingRecordSystem.Api/appsettings.Development.json b/TeachingRecordSystem/src/TeachingRecordSystem.Api/appsettings.Development.json index 782e8b314..600ea340b 100644 --- a/TeachingRecordSystem/src/TeachingRecordSystem.Api/appsettings.Development.json +++ b/TeachingRecordSystem/src/TeachingRecordSystem.Api/appsettings.Development.json @@ -11,8 +11,5 @@ "Username": "admin", "Password": "test" }, - "StorageConnectionString": "UseDevelopmentStorage=true", - "Webhooks": { - "CanonicalDomain": "https://localhost:5001" - } + "StorageConnectionString": "UseDevelopmentStorage=true" } diff --git a/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/ApplicationBuilderExtensions.cs b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/ApplicationBuilderExtensions.cs index f9a9b9e83..7905b2557 100644 --- a/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/ApplicationBuilderExtensions.cs +++ b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/ApplicationBuilderExtensions.cs @@ -14,4 +14,16 @@ public static IHostApplicationBuilder AddWebhookOptions(this IHostApplicationBui return builder; } + + public static IHostApplicationBuilder AddWebhookDeliveryService(this IHostApplicationBuilder builder) + { + AddWebhookOptions(builder); + + builder.Services.AddSingleton(); + WebhookSender.AddHttpClient(builder.Services); + + builder.Services.AddSingleton(); + + return builder; + } } diff --git a/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/IWebhookSender.cs b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/IWebhookSender.cs new file mode 100644 index 000000000..14e092f05 --- /dev/null +++ b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/IWebhookSender.cs @@ -0,0 +1,8 @@ +using TeachingRecordSystem.Core.DataStore.Postgres.Models; + +namespace TeachingRecordSystem.Core.Services.Webhooks; + +public interface IWebhookSender +{ + Task SendMessageAsync(WebhookMessage message, CancellationToken cancellationToken = default); +} diff --git a/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/WebhookDeliveryService.cs b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/WebhookDeliveryService.cs new file mode 100644 index 000000000..12e66514e --- /dev/null +++ b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/WebhookDeliveryService.cs @@ -0,0 +1,139 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Polly; +using TeachingRecordSystem.Core.DataStore.Postgres; + +namespace TeachingRecordSystem.Core.Services.Webhooks; + +public class WebhookDeliveryService( + IWebhookSender webhookSender, + IDbContextFactory dbContextFactory, + IClock clock, + ILogger logger) : BackgroundService +{ + public const int BatchSize = 20; + + private static readonly TimeSpan _pollInterval = TimeSpan.FromMinutes(1); + + private static readonly ResiliencePipeline _resiliencePipeline = new ResiliencePipelineBuilder() + .AddRetry(new Polly.Retry.RetryStrategyOptions() + { + BackoffType = DelayBackoffType.Linear, + Delay = TimeSpan.FromSeconds(30), + MaxRetryAttempts = 10 + }) + .Build(); + + public static TimeSpan[] RetryInvervals { get; } = + [ + TimeSpan.FromSeconds(5), + TimeSpan.FromMinutes(5), + TimeSpan.FromMinutes(30), + TimeSpan.FromHours(2), + TimeSpan.FromHours(5), + TimeSpan.FromHours(10), + TimeSpan.FromHours(14), + TimeSpan.FromHours(20), + TimeSpan.FromHours(24), + ]; + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + using var timer = new PeriodicTimer(_pollInterval); + + do + { + try + { + await _resiliencePipeline.ExecuteAsync( + async (_, ct) => + { + SendMessagesResult result; + do + { + result = await SendMessagesAsync(ct); + } + while (result.MoreRecords); + }, + stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + } + } + while (await timer.WaitForNextTickAsync(stoppingToken)); + } + + public async Task SendMessagesAsync(CancellationToken cancellationToken = default) + { + var startedAt = clock.UtcNow; + + await using var dbContext = await dbContextFactory.CreateDbContextAsync(); + var txn = await dbContext.Database.BeginTransactionAsync(System.Data.IsolationLevel.ReadCommitted); + + // Get the first batch of messages that are due to be sent. + // Constrain the batch to `batchSize`, but return one more record so we know if there are more that need to be processed. + var messages = await dbContext.WebhookMessages + .FromSql($""" + select * from webhook_messages + where next_delivery_attempt <= {clock.UtcNow} + order by next_delivery_attempt + limit {BatchSize + 1} + for update skip locked + """) + .Include(m => m.WebhookEndpoint) + .ToArrayAsync(); + + var moreRecords = messages.Length > BatchSize; + + await Parallel.ForEachAsync( + messages.Take(BatchSize), + cancellationToken, + async (message, ct) => + { + ct.ThrowIfCancellationRequested(); + + var now = clock.UtcNow; + message.DeliveryAttempts.Add(now); + + try + { + await webhookSender.SendMessageAsync(message); + + message.Delivered = now; + message.NextDeliveryAttempt = null; + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed delivering webhook message."); + + message.DeliveryErrors.Add(ex.Message); + + if (message.DeliveryAttempts.Count <= RetryInvervals.Length) + { + var nextRetryInterval = RetryInvervals[message.DeliveryAttempts.Count - 1]; + message.NextDeliveryAttempt = now.Add(nextRetryInterval); + + // If next retry is due before we'll next be polling then ensure we return 'true' for MoreRecords. + // (That ensures we won't have to wait for the timer to fire again before this message is retried.) + var nextRun = startedAt.Add(_pollInterval); + if (message.NextDeliveryAttempt < nextRun) + { + moreRecords = true; + } + } + else + { + message.NextDeliveryAttempt = null; + } + } + }); + + await dbContext.SaveChangesAsync(); + await txn.CommitAsync(); + + return new(moreRecords); + } + + public record SendMessagesResult(bool MoreRecords); +} diff --git a/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/WebhookSender.cs b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/WebhookSender.cs index 04bc4d223..f60aafc92 100644 --- a/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/WebhookSender.cs +++ b/TeachingRecordSystem/src/TeachingRecordSystem.Core/Services/Webhooks/WebhookSender.cs @@ -14,7 +14,7 @@ namespace TeachingRecordSystem.Core.Services.Webhooks; -public class WebhookSender(HttpClient httpClient, IOptions optionsAccessor) +public class WebhookSender(HttpClient httpClient, IOptions optionsAccessor) : IWebhookSender { public const string TagName = "trs-webhooks"; private const string DataContentType = "application/json; charset=utf-8"; diff --git a/TeachingRecordSystem/src/TeachingRecordSystem.Worker/Program.cs b/TeachingRecordSystem/src/TeachingRecordSystem.Worker/Program.cs index d7060e30c..f919d89ff 100644 --- a/TeachingRecordSystem/src/TeachingRecordSystem.Worker/Program.cs +++ b/TeachingRecordSystem/src/TeachingRecordSystem.Worker/Program.cs @@ -15,6 +15,7 @@ using TeachingRecordSystem.Core.Services.Notify; using TeachingRecordSystem.Core.Services.TrnGenerationApi; using TeachingRecordSystem.Core.Services.TrsDataSync; +using TeachingRecordSystem.Core.Services.Webhooks; using TeachingRecordSystem.Core.Services.WorkforceData; using TeachingRecordSystem.Worker.Infrastructure.Logging; @@ -42,7 +43,8 @@ .AddEmail() .AddIdentityApi() .AddNameSynonyms() - .AddDqtOutboxMessageProcessorService(); + .AddDqtOutboxMessageProcessorService() + .AddWebhookDeliveryService(); var crmServiceClient = new ServiceClient(builder.Configuration.GetRequiredValue("ConnectionStrings:Crm")) { diff --git a/TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/Services/Webhooks/WebhookDeliveryServiceTests.cs b/TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/Services/Webhooks/WebhookDeliveryServiceTests.cs new file mode 100644 index 000000000..7dd744879 --- /dev/null +++ b/TeachingRecordSystem/tests/TeachingRecordSystem.Core.Tests/Services/Webhooks/WebhookDeliveryServiceTests.cs @@ -0,0 +1,280 @@ +using System.Text.Json; +using Microsoft.Extensions.Logging.Abstractions; +using TeachingRecordSystem.Core.DataStore.Postgres.Models; +using TeachingRecordSystem.Core.Services.Webhooks; + +namespace TeachingRecordSystem.Core.Tests.Services.Webhooks; + +public class WebhookDeliveryServiceTests(DbFixture dbFixture) +{ + public TestableClock Clock { get; } = new TestableClock(); + + [Fact] + public async Task SendMessagesAsync_SendsDueMessageAndUpdatesDb() + { + // Arrange + var endpoint = await CreateApplicationUserAndEndpoint(); + var message = await CreateMessage(endpoint); + + var senderMock = new Mock(); + + var service = new WebhookDeliveryService( + senderMock.Object, + dbFixture.GetDbContextFactory(), + Clock, + new NullLogger()); + + // Act + var result = await service.SendMessagesAsync(); + + // Assert + senderMock.Verify(mock => mock.SendMessageAsync( + It.Is(m => m.WebhookMessageId == message.WebhookMessageId), + It.IsAny())); + + await dbFixture.WithDbContextAsync(async dbContext => + { + await dbContext.Entry(message).ReloadAsync(); + Assert.Equal(Clock.UtcNow, message.Delivered); + Assert.Collection(message.DeliveryAttempts, t => Assert.Equal(Clock.UtcNow, t)); + }); + } + + [Fact] + public async Task SendMessagesAsync_DoesNotSendMessageDueInFuture() + { + // Arrange + var endpoint = await CreateApplicationUserAndEndpoint(); + var message = await CreateMessage(endpoint, configureMessage: message => message.NextDeliveryAttempt = Clock.UtcNow.AddDays(1)); + + var senderMock = new Mock(); + + var service = new WebhookDeliveryService( + senderMock.Object, + dbFixture.GetDbContextFactory(), + Clock, + new NullLogger()); + + // Act + var result = await service.SendMessagesAsync(); + + // Assert + senderMock.Verify(mock => mock.SendMessageAsync( + It.Is(m => m.WebhookMessageId == message.WebhookMessageId), + It.IsAny()), + Times.Never()); + } + + [Fact] + public async Task SendMessagesAsync_NoOutstandingMessages_ReturnsMoreRecordsFalse() + { + // Arrange + var endpoint = await CreateApplicationUserAndEndpoint(); + await CreateDueMessages(endpoint, WebhookDeliveryService.BatchSize); + + var senderMock = new Mock(); + + var service = new WebhookDeliveryService( + senderMock.Object, + dbFixture.GetDbContextFactory(), + Clock, + new NullLogger()); + + // Act + var result = await service.SendMessagesAsync(); + + // Assert + Assert.False(result.MoreRecords); + } + + [Fact] + public async Task SendMessagesAsync_OutstandingMessages_ReturnsMoreRecordsTrue() + { + // Arrange + var endpoint = await CreateApplicationUserAndEndpoint(); + await CreateDueMessages(endpoint, WebhookDeliveryService.BatchSize + 1); + + var senderMock = new Mock(); + + var service = new WebhookDeliveryService( + senderMock.Object, + dbFixture.GetDbContextFactory(), + Clock, + new NullLogger()); + + // Act + var result = await service.SendMessagesAsync(); + + // Assert + Assert.True(result.MoreRecords); + } + + [Fact] + public async Task SendMessagesAsync_SenderFails_UpdatesDbWithDueTimeForRetry() + { + // Arrange + var endpoint = await CreateApplicationUserAndEndpoint(); + var message = await CreateMessage(endpoint); + + var senderMock = new Mock(); + + var sendMessageExceptionMessage = "Bang!"; + senderMock + .Setup(mock => mock.SendMessageAsync( + It.Is(m => m.WebhookMessageId == message.WebhookMessageId), + It.IsAny())) + .Throws(new Exception(sendMessageExceptionMessage)) + .Verifiable(Times.Once()); + + var service = new WebhookDeliveryService( + senderMock.Object, + dbFixture.GetDbContextFactory(), + Clock, + new NullLogger()); + + // Act + var result = await service.SendMessagesAsync(); + + // Assert + senderMock.Verify(); + + await dbFixture.WithDbContextAsync(async dbContext => + { + await dbContext.Entry(message).ReloadAsync(); + Assert.Null(message.Delivered); + Assert.Collection(message.DeliveryAttempts, t => Assert.Equal(Clock.UtcNow, t)); + Assert.True(message.NextDeliveryAttempt > Clock.UtcNow); + Assert.Collection(message.DeliveryErrors, e => Assert.Equal(sendMessageExceptionMessage, e)); + }); + } + + [Fact] + public async Task SendMessagesAsync_SenderFailsAndNoMoreRetriesAllowed_UpdatesDbWithNullDueTime() + { + // Arrange + var endpoint = await CreateApplicationUserAndEndpoint(); + + var message = await CreateMessage(endpoint, timestamp: Clock.UtcNow.Subtract(TimeSpan.FromDays(365)), message => + { + // Set up a message that's been attempted multiple times before and has failed every time but has a single retry left + + var attemptsAndErrors = WebhookDeliveryService.RetryInvervals.SkipLast(0).Prepend(TimeSpan.Zero).Aggregate( + (Attempts: Array.Empty(), Errors: Array.Empty()), + (t, interval) => t with + { + Attempts = [.. t.Attempts, t.Attempts.LastOrDefault(DateTime.SpecifyKind(message.Timestamp.DateTime, DateTimeKind.Utc)).Add(interval)], + Errors = [.. t.Errors, $"Error {t.Errors.Length + 1}"] + }); + + message.DeliveryAttempts = attemptsAndErrors.Attempts.ToList(); + message.DeliveryErrors = attemptsAndErrors.Errors.ToList(); + }); + + var senderMock = new Mock(); + + var sendMessageExceptionMessage = "Bang!"; + senderMock + .Setup(mock => mock.SendMessageAsync( + It.Is(m => m.WebhookMessageId == message.WebhookMessageId), + It.IsAny())) + .Throws(new Exception(sendMessageExceptionMessage)) + .Verifiable(Times.Once()); + + var service = new WebhookDeliveryService( + senderMock.Object, + dbFixture.GetDbContextFactory(), + Clock, + new NullLogger()); + + // Act + var result = await service.SendMessagesAsync(); + + // Assert + senderMock.Verify(); + + await dbFixture.WithDbContextAsync(async dbContext => + { + await dbContext.Entry(message).ReloadAsync(); + Assert.Null(message.Delivered); + Assert.Equal(Clock.UtcNow, message.DeliveryAttempts.Last()); + Assert.Null(message.NextDeliveryAttempt); + Assert.Equal(sendMessageExceptionMessage, message.DeliveryErrors.Last()); + }); + } + + private Task CreateApplicationUserAndEndpoint() => + dbFixture.WithDbContextAsync(async dbContext => + { + var applicationUser = new ApplicationUser() + { + UserId = Guid.NewGuid(), + Name = $"Test user {Guid.NewGuid()}" + }; + dbContext.ApplicationUsers.Add(applicationUser); + + var endpoint = new WebhookEndpoint() + { + Address = "http://localhost", + ApiVersion = "20240101", + ApplicationUserId = applicationUser.UserId, + CloudEventTypes = [], + Enabled = true, + WebhookEndpointId = Guid.NewGuid(), + }; + dbContext.WebhookEndpoints.Add(endpoint); + + await dbContext.SaveChangesAsync(); + + return endpoint; + }); + + private Task CreateDueMessages(WebhookEndpoint endpoint, int count) => + dbFixture.WithDbContextAsync(async dbContext => + { + for (var i = 0; i < count; i++) + { + var message = new WebhookMessage() + { + ApiVersion = endpoint.ApiVersion, + CloudEventId = Guid.NewGuid().ToString(), + CloudEventType = "test.event", + Data = JsonSerializer.SerializeToElement(new + { + Foo = i + }), + Timestamp = Clock.UtcNow, + NextDeliveryAttempt = Clock.UtcNow, + WebhookEndpointId = endpoint.WebhookEndpointId, + WebhookMessageId = Guid.NewGuid() + }; + dbContext.WebhookMessages.Add(message); + } + + await dbContext.SaveChangesAsync(); + }); + + private Task CreateMessage(WebhookEndpoint endpoint, DateTime? timestamp = null, Action? configureMessage = null) => + dbFixture.WithDbContextAsync(async dbContext => + { + var message = new WebhookMessage() + { + ApiVersion = endpoint.ApiVersion, + CloudEventId = Guid.NewGuid().ToString(), + CloudEventType = "test.event", + Data = JsonSerializer.SerializeToElement(new + { + Foo = 42 + }), + Timestamp = timestamp ?? Clock.UtcNow, + NextDeliveryAttempt = Clock.UtcNow, + WebhookEndpointId = endpoint.WebhookEndpointId, + WebhookMessageId = Guid.NewGuid(), + }; + configureMessage?.Invoke(message); + dbContext.WebhookMessages.Add(message); + + await dbContext.SaveChangesAsync(); + + return message; + }); +}