diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs index 62956dab8..b3bf5c7cf 100644 --- a/Common/src/Pollster/Pollster.cs +++ b/Common/src/Pollster/Pollster.cs @@ -119,7 +119,6 @@ public Pollster(IPullQueueStorage pullQueueStorage, meterHolder_ = meterHolder; ownerPodId_ = identifier.OwnerPodId; ownerPodName_ = identifier.OwnerPodName; - Failed = false; var started = DateTime.UtcNow; meterHolder_.Meter.CreateObservableCounter("uptime", @@ -159,12 +158,6 @@ public Pollster(IPullQueueStorage pullQueueStorage, public ICollection TaskProcessing => taskProcessingDict_.Keys; - /// - /// Is true when the MainLoop exited with an error - /// Used in Unit tests - /// - public bool Failed { get; private set; } - public async Task Init(CancellationToken cancellationToken) => await Task.WhenAll(pullQueueStorage_.Init(cancellationToken), dataPrefetcher_.Init(cancellationToken), @@ -258,7 +251,7 @@ await taskHandler.StopCancelledTask() } } - public async Task MainLoop(CancellationToken cancellationToken) + public async Task MainLoop() { try { @@ -420,7 +413,6 @@ await runningTaskQueue_.WaitForNextWriteAsync(pollsterOptions_.TimeoutBeforeNext } catch (Exception e) { - Failed = true; exceptionManager_.FatalError(logger_, e, "Error in pollster"); diff --git a/Common/src/Pollster/RunningTaskProcessor.cs b/Common/src/Pollster/RunningTaskProcessor.cs index 16935e850..06e6d6099 100644 --- a/Common/src/Pollster/RunningTaskProcessor.cs +++ b/Common/src/Pollster/RunningTaskProcessor.cs @@ -56,8 +56,17 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { - var taskHandler = await runningTaskQueue_.ReadAsync(exceptionManager_.EarlyCancellationToken) - .ConfigureAwait(false); + TaskHandler taskHandler; + try + { + taskHandler = await runningTaskQueue_.ReadAsync(exceptionManager_.EarlyCancellationToken) + .ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + await using var taskHandlerDispose = new Deferrer(taskHandler); var taskInfo = taskHandler.GetAcquiredTaskInfo(); diff --git a/Common/src/Utils/ExceptionManager.cs b/Common/src/Utils/ExceptionManager.cs index f319375f7..759d7aca8 100644 --- a/Common/src/Utils/ExceptionManager.cs +++ b/Common/src/Utils/ExceptionManager.cs @@ -86,6 +86,8 @@ public CancellationToken EarlyCancellationToken public CancellationToken LateCancellationToken => lateCts_.Token; + public bool Failed { get; private set; } + public void Dispose() { foreach (var disposable in disposables_) @@ -114,9 +116,9 @@ public void RecordError(ILogger? logger, using var scope = logger.BeginScope("Exception #{NbError}/{MaxError}", nbError, maxError_); - Action log = nbError <= maxError_ - ? LoggerExtensions.LogError - : LoggerExtensions.LogDebug; + Action log = nbError <= maxError_ + ? LoggerExtensions.LogError + : LoggerExtensions.LogDebug; log.Invoke(logger, e, @@ -127,6 +129,7 @@ public void RecordError(ILogger? logger, if (nbError == maxError_) { logger_?.LogCritical("Stop Application after too many errors"); + Failed = true; earlyCts_.Cancel(); } } @@ -144,9 +147,9 @@ public void FatalError(ILogger? logger, if (logger is not null) { using var scope = logger.BeginScope("Fatal Exception"); - Action log = nbError <= maxError_ - ? LoggerExtensions.LogCritical - : LoggerExtensions.LogDebug; + Action log = nbError <= maxError_ + ? LoggerExtensions.LogCritical + : LoggerExtensions.LogDebug; log.Invoke(logger, e, @@ -154,6 +157,7 @@ public void FatalError(ILogger? logger, args); } + Failed = true; earlyCts_.Cancel(); } diff --git a/Common/tests/Helpers/TestPollingAgentProvider.cs b/Common/tests/Helpers/TestPollingAgentProvider.cs index 85a8f9a72..f5b110d7c 100644 --- a/Common/tests/Helpers/TestPollingAgentProvider.cs +++ b/Common/tests/Helpers/TestPollingAgentProvider.cs @@ -52,10 +52,9 @@ public class TestPollingAgentProvider : IDisposable private readonly WebApplication app_; private readonly LoggerFactory loggerFactory_; - private readonly CancellationTokenSource pollsterCancellationTokenSource_ = new(); - private readonly Task pollsterRunningTask_; - private readonly IMongoRunner runner_; - public readonly ISubmitter Submitter; + private readonly Task pollsterRunningTask_; + private readonly IMongoRunner runner_; + public readonly ISubmitter Submitter; public TestPollingAgentProvider(IWorkerStreamHandler workerStreamHandler) @@ -131,16 +130,14 @@ public TestPollingAgentProvider(IWorkerStreamHandler workerStreamHandler) sessionTable.Init(CancellationToken.None) .Wait(); - pollsterRunningTask_ = Task.Factory.StartNew(() => pollster.MainLoop(pollsterCancellationTokenSource_.Token), + pollsterRunningTask_ = Task.Factory.StartNew(() => pollster.MainLoop(), TaskCreationOptions.LongRunning); } public void Dispose() { - pollsterCancellationTokenSource_?.Cancel(false); pollsterRunningTask_?.Wait(); pollsterRunningTask_?.Dispose(); - pollsterCancellationTokenSource_?.Dispose(); (app_ as IDisposable)?.Dispose(); loggerFactory_?.Dispose(); runner_?.Dispose(); diff --git a/Common/tests/Helpers/TestPollsterProvider.cs b/Common/tests/Helpers/TestPollsterProvider.cs index 45f7c4db2..f9205158b 100644 --- a/Common/tests/Helpers/TestPollsterProvider.cs +++ b/Common/tests/Helpers/TestPollsterProvider.cs @@ -18,8 +18,10 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using System.IO; using System.Threading; +using System.Threading.Tasks; using ArmoniK.Api.Common.Options; using ArmoniK.Core.Adapters.Memory; @@ -50,19 +52,25 @@ namespace ArmoniK.Core.Common.Tests.Helpers; public class TestPollsterProvider : IDisposable { - private const string DatabaseName = "ArmoniK_TestDB"; - private static readonly ActivitySource ActivitySource = new("ArmoniK.Core.Common.Tests.TestPollsterProvider"); - private readonly WebApplication app_; - private readonly IMongoClient client_; - private readonly TimeSpan? graceDelay_; - private readonly IObjectStorage objectStorage_; - public readonly IPartitionTable PartitionTable; - public readonly Common.Pollster.Pollster Pollster; - public readonly IResultTable ResultTable; - private readonly IMongoRunner runner_; - private readonly ISessionTable sessionTable_; - public readonly ISubmitter Submitter; - public readonly ITaskTable TaskTable; + private const string DatabaseName = "ArmoniK_TestDB"; + private static readonly ActivitySource ActivitySource = new("ArmoniK.Core.Common.Tests.TestPollsterProvider"); + private readonly WebApplication app_; + private readonly IMongoClient client_; + + [SuppressMessage("Usage", + "CA2213: Disposable fields must be disposed")] + public readonly ExceptionManager ExceptionManager; + + private readonly TimeSpan? graceDelay_; + public readonly IHostApplicationLifetime Lifetime; + private readonly IObjectStorage objectStorage_; + public readonly IPartitionTable PartitionTable; + public readonly Common.Pollster.Pollster Pollster; + public readonly IResultTable ResultTable; + private readonly IMongoRunner runner_; + private readonly ISessionTable sessionTable_; + public readonly ISubmitter Submitter; + public readonly ITaskTable TaskTable; public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler, @@ -178,13 +186,15 @@ public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler, app_ = builder.Build(); app_.Start(); - ResultTable = app_.Services.GetRequiredService(); - TaskTable = app_.Services.GetRequiredService(); - PartitionTable = app_.Services.GetRequiredService(); - sessionTable_ = app_.Services.GetRequiredService(); - Submitter = app_.Services.GetRequiredService(); - Pollster = app_.Services.GetRequiredService(); - objectStorage_ = app_.Services.GetRequiredService(); + ResultTable = app_.Services.GetRequiredService(); + TaskTable = app_.Services.GetRequiredService(); + PartitionTable = app_.Services.GetRequiredService(); + sessionTable_ = app_.Services.GetRequiredService(); + Submitter = app_.Services.GetRequiredService(); + Pollster = app_.Services.GetRequiredService(); + objectStorage_ = app_.Services.GetRequiredService(); + ExceptionManager = app_.Services.GetRequiredService(); + Lifetime = app_.Lifetime; ResultTable.Init(CancellationToken.None) .Wait(); @@ -206,4 +216,14 @@ public void Dispose() runner_?.Dispose(); GC.SuppressFinalize(this); } + + public Task StopApplicationAfter(TimeSpan delay = default) + => Task.Run(async () => + { + await Task.Delay(delay, + ExceptionManager.EarlyCancellationToken) + .ConfigureAwait(false); + + Lifetime.StopApplication(); + }); } diff --git a/Common/tests/Pollster/PollsterTest.cs b/Common/tests/Pollster/PollsterTest.cs index fee8145ce..9b5fe57cb 100644 --- a/Common/tests/Pollster/PollsterTest.cs +++ b/Common/tests/Pollster/PollsterTest.cs @@ -365,11 +365,9 @@ await testServiceProvider.Pollster.Init(CancellationToken.None) .ConfigureAwait(false)).Status); // This test that we return from the mainloop after the health check is unhealthy - var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await testServiceProvider.Pollster.MainLoop(cancellationTokenSource.Token) + await testServiceProvider.Pollster.MainLoop() .ConfigureAwait(false); - Assert.False(testServiceProvider.Pollster.Failed); - Assert.IsFalse(cancellationTokenSource.IsCancellationRequested); + Assert.True(testServiceProvider.ExceptionManager.Failed); } [Test] @@ -400,10 +398,10 @@ public async Task RunThenCancelPollster() await testServiceProvider.Pollster.Init(CancellationToken.None) .ConfigureAwait(false); - var source = new CancellationTokenSource(TimeSpan.FromMilliseconds(105)); + var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromMicroseconds(105)); - Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop(source.Token)); - Assert.True(source.Token.IsCancellationRequested); + Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop()); + Assert.DoesNotThrowAsync(() => stop); Assert.AreEqual(Array.Empty(), testServiceProvider.Pollster.TaskProcessing); } @@ -469,11 +467,11 @@ await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandl await testServiceProvider.Pollster.Init(CancellationToken.None) .ConfigureAwait(false); - var source = new CancellationTokenSource(TimeSpan.FromMilliseconds(1000)); + var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromSeconds(1)); - Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop(source.Token)); - Assert.False(testServiceProvider.Pollster.Failed); - Assert.True(source.Token.IsCancellationRequested); + Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop()); + Assert.DoesNotThrowAsync(() => stop); + Assert.False(testServiceProvider.ExceptionManager.Failed); Assert.AreEqual(TaskStatus.Completed, await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted, @@ -513,11 +511,11 @@ await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandl await testServiceProvider.Pollster.Init(CancellationToken.None) .ConfigureAwait(false); - var source = new CancellationTokenSource(TimeSpan.FromMilliseconds(300)); + var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromMilliseconds(300)); - Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop(source.Token)); - Assert.False(testServiceProvider.Pollster.Failed); - Assert.True(source.Token.IsCancellationRequested); + Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop()); + Assert.DoesNotThrowAsync(() => stop); + Assert.False(testServiceProvider.ExceptionManager.Failed); // wait to exceed grace delay and see that task is properly resubmitted await Task.Delay(TimeSpan.FromMilliseconds(200), @@ -561,9 +559,9 @@ await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandl await testServiceProvider.Pollster.Init(CancellationToken.None) .ConfigureAwait(false); - var source = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromSeconds(5)); - var mainLoopTask = testServiceProvider.Pollster.MainLoop(source.Token); + var mainLoopTask = testServiceProvider.Pollster.MainLoop(); await Task.Delay(TimeSpan.FromMilliseconds(200), CancellationToken.None) @@ -584,8 +582,8 @@ await testServiceProvider.Pollster.StopCancelledTask() .ConfigureAwait(false); Assert.DoesNotThrowAsync(() => mainLoopTask); - Assert.False(testServiceProvider.Pollster.Failed); - Assert.True(source.Token.IsCancellationRequested); + Assert.DoesNotThrowAsync(() => stop); + Assert.False(testServiceProvider.ExceptionManager.Failed); Assert.That(await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted, CancellationToken.None) @@ -664,12 +662,12 @@ public async Task ExecuteTooManyErrorShouldFail(Mock mockS await pollster.Init(CancellationToken.None) .ConfigureAwait(false); - var source = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromSeconds(10)); - - Assert.DoesNotThrowAsync(() => pollster.MainLoop(source.Token)); - Assert.True(pollster.Failed); - Assert.False(source.Token.IsCancellationRequested); + Assert.DoesNotThrowAsync(() => pollster.MainLoop()); + Assert.That(() => stop, + Throws.InstanceOf()); + Assert.True(testServiceProvider.ExceptionManager.Failed); Assert.AreEqual(Array.Empty(), testServiceProvider.Pollster.TaskProcessing); } @@ -713,9 +711,10 @@ await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandl await testServiceProvider.Pollster.Init(CancellationToken.None) .ConfigureAwait(false); - var source = new CancellationTokenSource(TimeSpan.FromMilliseconds(300)); + var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromMilliseconds(300)); - Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop(source.Token)); + Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop()); + Assert.DoesNotThrowAsync(() => stop); Assert.AreEqual(TaskStatus.Submitted, await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted, diff --git a/Compute/PollingAgent/src/Worker.cs b/Compute/PollingAgent/src/Worker.cs index 4b370b797..cd68f5848 100644 --- a/Compute/PollingAgent/src/Worker.cs +++ b/Compute/PollingAgent/src/Worker.cs @@ -32,5 +32,5 @@ public Worker(Pollster pollster) => pollster_ = pollster; protected override Task ExecuteAsync(CancellationToken stoppingToken) - => pollster_.MainLoop(stoppingToken); + => pollster_.MainLoop(); }