Skip to content

Commit

Permalink
fix: health check waits for tasks to finish before becoming unhealthy
Browse files Browse the repository at this point in the history
  • Loading branch information
aneojgurhem committed Oct 30, 2024
1 parent 8f6a2f9 commit 564c0b5
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 19 deletions.
24 changes: 13 additions & 11 deletions Common/src/Pollster/Pollster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;

using TaskStatus = ArmoniK.Core.Common.Storage.TaskStatus;

namespace ArmoniK.Core.Common.Pollster;

public class Pollster : IInitializable
Expand Down Expand Up @@ -170,12 +172,7 @@ public async Task Init(CancellationToken cancellationToken)

public async Task<HealthCheckResult> Check(HealthCheckTag tag)
{
if (healthCheckFailedResult_ is not null)
{
return healthCheckFailedResult_ ?? HealthCheckResult.Unhealthy("Health Check failed previously so this polling agent should be destroyed.");
}

if (endLoopReached_)
if (endLoopReached_ && taskProcessingDict_.IsEmpty)
{
return HealthCheckResult.Unhealthy("End of main loop reached, no more tasks will be executed.");
}
Expand Down Expand Up @@ -234,6 +231,12 @@ public async Task<HealthCheckResult> Check(HealthCheckTag tag)
healthCheckFailedResult_ = result;
}

if (tag == HealthCheckTag.Liveness && taskProcessingDict_.Any(pair => pair.Value.GetAcquiredTaskInfoOrNull()
?.TaskStatus is not null and not TaskStatus.Dispatched))
{
return HealthCheckResult.Healthy();
}

if (tag == HealthCheckTag.Readiness && taskProcessingDict_.IsEmpty)
{
return HealthCheckResult.Unhealthy("No tasks to process");
Expand Down Expand Up @@ -264,11 +267,10 @@ await Init(exceptionManager_.EarlyCancellationToken)
if (healthCheckFailedResult_ is not null)
{
var hcr = healthCheckFailedResult_.Value;
exceptionManager_.FatalError(logger_,
hcr.Exception,
"Health Check failed with status {Status} thus no more tasks will be executed:\n{Description}",
hcr.Status,
hcr.Description);
logger_.LogError(hcr.Exception,
"Health Check failed with status {Status} thus no more tasks will be acquired (tasks already acquired will be executed to completion if possible):\n{Description}",
hcr.Status,
hcr.Description);
return;
}

Expand Down
11 changes: 10 additions & 1 deletion Common/src/Pollster/TaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -707,12 +707,21 @@ await ReleaseAndPostponeTask()
/// The metadata of the task
/// </returns>
public TaskInfo GetAcquiredTaskInfo()
=> GetAcquiredTaskInfoOrNull() ?? throw new ArmoniKException("TaskData should not be null after successful acquisition");

/// <summary>
/// Get the meta data of the acquired task
/// </summary>
/// <returns>
/// The metadata of the task or null
/// </returns>
public TaskInfo? GetAcquiredTaskInfoOrNull()
=> taskData_ is not null
? new TaskInfo(taskData_.SessionId,
taskData_.TaskId,
messageHandler_.MessageId,
taskData_.Status)
: throw new ArmoniKException("TaskData should not be null after successful acquisition");
: null;

/// <summary>
/// Release task from the current agent and set message to <see cref="QueueMessageStatus.Postponed" />
Expand Down
5 changes: 3 additions & 2 deletions Common/tests/Helpers/SimplePullQueueStorageChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ namespace ArmoniK.Core.Common.Tests.Helpers;

public class SimplePullQueueStorageChannel : IPullQueueStorage, IPushQueueStorage
{
public readonly Channel<IQueueMessageHandler> Channel = System.Threading.Channels.Channel.CreateUnbounded<IQueueMessageHandler>();
public readonly Channel<IQueueMessageHandler> Channel = System.Threading.Channels.Channel.CreateUnbounded<IQueueMessageHandler>();
public HealthCheckResult CheckResult = HealthCheckResult.Healthy();

public Task<HealthCheckResult> Check(HealthCheckTag tag)
=> Task.FromResult(HealthCheckResult.Healthy());
=> Task.FromResult(CheckResult);

public Task Init(CancellationToken cancellationToken)
=> Task.CompletedTask;
Expand Down
18 changes: 13 additions & 5 deletions Common/tests/Pollster/PollsterTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,6 @@ await testServiceProvider.Pollster.Init(CancellationToken.None)
// This test that we return from the mainloop after the health check is unhealthy
await testServiceProvider.Pollster.MainLoop()
.ConfigureAwait(false);
Assert.True(testServiceProvider.ExceptionManager.Failed);
}

[Test]
Expand Down Expand Up @@ -449,10 +448,14 @@ await Task.Delay(TimeSpan.FromMilliseconds(delay_),

[Test]
[Timeout(10000)]
public async Task ExecuteTaskShouldSucceed()
public async Task ExecuteTaskShouldSucceed([Values] HealthStatus healthStatus)
{
var mockPullQueueStorage = new SimplePullQueueStorageChannel();
var waitWorkerStreamHandler = new SimpleWorkerStreamHandler();
var mockPullQueueStorage = new SimplePullQueueStorageChannel
{
CheckResult = new HealthCheckResult(healthStatus,
$"value for test called {nameof(ExecuteTaskShouldSucceed)}"),
};
var waitWorkerStreamHandler = new WaitWorkerStreamHandler(500);
var simpleAgentHandler = new SimpleAgentHandler();

using var testServiceProvider = new TestPollsterProvider(waitWorkerStreamHandler,
Expand Down Expand Up @@ -480,8 +483,13 @@ await testServiceProvider.Pollster.Init(CancellationToken.None)
.ConfigureAwait(false);

var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromSeconds(1));
var loop = testServiceProvider.Pollster.MainLoop();

Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop());
// checking health should not interrupt task execution
var res = await testServiceProvider.Pollster.Check(HealthCheckTag.Liveness)
.ConfigureAwait(false);

Assert.DoesNotThrowAsync(() => loop);
Assert.DoesNotThrowAsync(() => stop);

Assert.AreEqual(TaskStatus.Completed,
Expand Down

0 comments on commit 564c0b5

Please sign in to comment.