Skip to content

Commit

Permalink
Ensure service remains healthy with rare messages (#974)
Browse files Browse the repository at this point in the history
Fixes #973

If the queue is empty and the `MessageProcessingHealthCheck` is executed,
the service resets the health check's timeout. For this to work, the
health check frequency should be less than the
MaxTimeSinceLastProcessedMessage.
  • Loading branch information
rngcntr authored Nov 21, 2023
1 parent 79f80ce commit c0b4bf8
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 21 deletions.
6 changes: 6 additions & 0 deletions src/Motor.Extensions.Hosting/Internal/BackgroundTaskQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public BackgroundTaskQueue(IMetricsFactory<BackgroundTaskQueue<T>>? metricsFacto
_elementsInQueue = metricsFactory?.CreateGauge("task_queue_enqueued_elements", "", false, "type")
?.WithLabels(typeof(T).Name);
_totalMessages = metricsFactory?.CreateCounter("total_messages", "", false, "type")?.WithLabels(typeof(T).Name);
LastDequeuedAt = DateTimeOffset.UtcNow;
}

public Task<ProcessedMessageStatus> QueueBackgroundWorkItem(T item)
Expand All @@ -31,6 +32,11 @@ public Task<ProcessedMessageStatus> QueueBackgroundWorkItem(T item)

var taskCompletionStatus = new TaskCompletionSource<ProcessedMessageStatus>();

if (_workItems.IsEmpty)
{
// Simulate recent dequeue to avoid HealthCheck triggering immediately
LastDequeuedAt = DateTimeOffset.UtcNow;
}
_workItems.Enqueue(new QueueItem<T>(item, taskCompletionStatus));
_elementsInQueue?.Inc();
_totalMessages?.Inc();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,71 +1,125 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Options;
using Moq;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;
using Motor.Extensions.Hosting.HealthChecks;
using Motor.Extensions.Hosting.Internal;
using Motor.Extensions.TestUtilities;
using Xunit;

namespace Motor.Extensions.Hosting_UnitTest.HealthChecks;

public class MessageProcessingHealthCheckTest
{
private readonly TimeSpan _timeout = TimeSpan.FromMilliseconds(50);

[Fact]
public async void CheckHealthAsync_LastDequeuedAtExceededTimeoutRangeAndQueueNotEmpty_ServiceIsUnhealthy()
public async void CheckHealthAsync_QueueHasMessagesWithoutRecentAcknowledgement_ServiceIsUnhealthy()
{
var healthCheck = CreateHealthCheck(true, 10);
var queue = CreateEmptyQueue();
var healthCheck = CreateHealthCheck(queue);
queue.QueueBackgroundWorkItem(MotorCloudEvent.CreateTestCloudEvent<string>("message"));
await Task.Delay(_timeout * 2);

var result = (await healthCheck.CheckHealthAsync(new HealthCheckContext())).Status;

Assert.Equal(HealthStatus.Unhealthy, result);
}

[Fact]
public async void CheckHealthAsync_LastDequeuedAtWithinTimeoutRangeAndQueueNotEmpty_ServiceIsHealthy()
public async void CheckHealthAsync_QueueHasMessagesButMessageWasRecentlyAcknowledged_ServiceIsHealthy()
{
var queue = CreateEmptyQueue();
var healthCheck = CreateHealthCheck(queue);
queue.QueueBackgroundWorkItem(MotorCloudEvent.CreateTestCloudEvent<string>("message0"));
await queue.DequeueAsync(CancellationToken.None);
queue.QueueBackgroundWorkItem(MotorCloudEvent.CreateTestCloudEvent<string>("message1"));

var result = (await healthCheck.CheckHealthAsync(new HealthCheckContext())).Status;

Assert.Equal(HealthStatus.Healthy, result);
}

[Fact]
public async void CheckHealthAsync_QueueRemainsEmptyLongerThanTimeout_ServiceIsHealthy()
{
var queue = CreateEmptyQueue();
var healthCheck = CreateHealthCheck(queue);
await Task.Delay(_timeout * 2);

var result = (await healthCheck.CheckHealthAsync(new HealthCheckContext())).Status;

Assert.Equal(HealthStatus.Healthy, result);
}

[Fact]
public async void CheckHealthAsync_QueueBecomesEmptyLongerThanTimeout_ServiceIsHealthy()
{
var healthCheck = CreateHealthCheck(false, 10);
var queue = CreateEmptyQueue();
var healthCheck = CreateHealthCheck(queue);
queue.QueueBackgroundWorkItem(MotorCloudEvent.CreateTestCloudEvent<string>("message"));
await queue.DequeueAsync(CancellationToken.None);
await Task.Delay(_timeout * 2);

var result = (await healthCheck.CheckHealthAsync(new HealthCheckContext())).Status;

Assert.Equal(HealthStatus.Healthy, result);
}

[Fact]
public async void CheckHealthAsync_LastDequeuedAtExceededTimeoutRangeAndQueueEmpty_ServiceIsHealthy()
public async void CheckHealthAsync_QueueIsEmptyAndMessageWasRecentlyAcknowledged_ServiceIsHealthy()
{
var healthCheck = CreateHealthCheck(true, 0);
var queue = CreateEmptyQueue();
var healthCheck = CreateHealthCheck(queue);
queue.QueueBackgroundWorkItem(MotorCloudEvent.CreateTestCloudEvent<string>("message"));
await queue.DequeueAsync(CancellationToken.None);

var result = (await healthCheck.CheckHealthAsync(new HealthCheckContext())).Status;

Assert.Equal(HealthStatus.Healthy, result);
}

[Fact]
public async void CheckHealthAsync_LastDequeuedAtWithinTimeoutRangeAndQueueEmpty_ServiceIsHealthy()
public async void CheckHealthAsync_MessageAppearsInQueueAfterQueueHasBeenEmpty_ServiceIsHealthy()
{
var healthCheck = CreateHealthCheck(false, 0);
var queue = CreateEmptyQueue();
var healthCheck = CreateHealthCheck(queue);
await Task.Delay(_timeout * 2);
queue.QueueBackgroundWorkItem(MotorCloudEvent.CreateTestCloudEvent<string>("message"));

var result = (await healthCheck.CheckHealthAsync(new HealthCheckContext())).Status;

Assert.Equal(HealthStatus.Healthy, result);
}

private MessageProcessingHealthCheck<string> CreateHealthCheck(bool exceededMaxTimeSinceLastProcessedMessage,
int itemCount)
[Fact]
public async void CheckHealthAsync_MessageRemainsInQueueLongerThanTimeout_ServiceIsUnhealthy()
{
var queue = CreateEmptyQueue();
var healthCheck = CreateHealthCheck(queue);
queue.QueueBackgroundWorkItem(MotorCloudEvent.CreateTestCloudEvent<string>("message"));
await Task.Delay(_timeout * 2);

var result = (await healthCheck.CheckHealthAsync(new HealthCheckContext())).Status;

Assert.Equal(HealthStatus.Unhealthy, result);
}

private MessageProcessingHealthCheck<string> CreateHealthCheck(IBackgroundTaskQueue<MotorCloudEvent<string>> queue)
{
var maxTimeSinceLastProcessedMessage = TimeSpan.FromMilliseconds(100);
var config = new MessageProcessingOptions
{
MaxTimeSinceLastProcessedMessage = maxTimeSinceLastProcessedMessage
MaxTimeSinceLastProcessedMessage = _timeout
};
var queue = new Mock<IBackgroundTaskQueue<MotorCloudEvent<string>>>();
queue.Setup(q => q.LastDequeuedAt).Returns(exceededMaxTimeSinceLastProcessedMessage
? DateTimeOffset.UtcNow.Subtract(maxTimeSinceLastProcessedMessage + TimeSpan.FromMilliseconds(50))
: DateTimeOffset.UtcNow.Subtract(maxTimeSinceLastProcessedMessage - TimeSpan.FromMilliseconds(50)));
queue.Setup(q => q.ItemCount).Returns(itemCount);
return new MessageProcessingHealthCheck<string>(
Options.Create(config),
queue.Object);
return new MessageProcessingHealthCheck<string>(Options.Create(config), queue);
}

private IBackgroundTaskQueue<MotorCloudEvent<string>> CreateEmptyQueue()
{
var queue = new BackgroundTaskQueue<MotorCloudEvent<string>>(null);
return queue;
}
}

0 comments on commit c0b4bf8

Please sign in to comment.