diff --git a/Common/src/Pollster/Pollster.cs b/Common/src/Pollster/Pollster.cs index 4f869cc49..bf098cf3a 100644 --- a/Common/src/Pollster/Pollster.cs +++ b/Common/src/Pollster/Pollster.cs @@ -372,14 +372,22 @@ await messages.Current.DisposeIgnoreErrorAsync(logger_) await taskHandler.PreProcessing() .ConfigureAwait(false); - await runningTaskQueue_.WriteAsync(taskHandler, - pollsterOptions_.TimeoutBeforeNextAcquisition, - exceptionManager_.EarlyCancellationToken) - .ConfigureAwait(false); - - // TaskHandler has been successfully sent to the next stage of the pipeline - // So remove the automatic dispose of the TaskHandler - taskHandlerDispose.Reset(); + try + { + await runningTaskQueue_.WriteAsync(taskHandler, + pollsterOptions_.TimeoutBeforeNextAcquisition, + exceptionManager_.EarlyCancellationToken) + .ConfigureAwait(false); + + // TaskHandler has been successfully sent to the next stage of the pipeline + // So remove the automatic dispose of the TaskHandler + taskHandlerDispose.Reset(); + } + catch (TimeoutException) + { + await taskHandler.ReleaseAndPostponeTask() + .ConfigureAwait(false); + } } catch (Exception e) { diff --git a/Common/tests/Helpers/TestPollsterProvider.cs b/Common/tests/Helpers/TestPollsterProvider.cs index 41ad30a17..f6c1e9533 100644 --- a/Common/tests/Helpers/TestPollsterProvider.cs +++ b/Common/tests/Helpers/TestPollsterProvider.cs @@ -48,6 +48,8 @@ using MongoDB.Bson; using MongoDB.Driver; +using NUnit.Framework; + namespace ArmoniK.Core.Common.Tests.Helpers; public class TestPollsterProvider : IDisposable @@ -76,7 +78,9 @@ public class TestPollsterProvider : IDisposable public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler, IAgentHandler agentHandler, IPullQueueStorage pullQueueStorage, - TimeSpan? graceDelay = null) + TimeSpan? graceDelay = null, + TimeSpan? acquireTimeout = null, + int maxError = 5) { graceDelay_ = graceDelay; var logger = NullLogger.Instance; @@ -128,6 +132,17 @@ public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler, : graceDelay .ToString() }, + { + $"{Injection.Options.Pollster.SettingSection}:{nameof(Injection.Options.Pollster.TimeoutBeforeNextAcquisition)}", + acquireTimeout is null + ? TimeSpan.FromSeconds(10) + .ToString() + : acquireTimeout.ToString() + }, + { + $"{Injection.Options.Pollster.SettingSection}:{nameof(Injection.Options.Pollster.MaxErrorAllowed)}", + maxError.ToString() + }, { $"{Injection.Options.Pollster.SettingSection}:{nameof(Injection.Options.Pollster.SharedCacheFolder)}", Path.Combine(Path.GetTempPath(), @@ -226,4 +241,24 @@ await Task.Delay(delay, Lifetime.StopApplication(); }); + + public void AssertFailAfterError(int nbError = 1) + { + for (var i = 0; i < nbError; i++) + { + if (ExceptionManager.Failed) + { + Assert.Fail($"ExceptionManager failed after {i} errors while it was expected to failed after {nbError}"); + } + + ExceptionManager.RecordError(null, + null, + "Dummy Error"); + } + + if (!ExceptionManager.Failed) + { + Assert.Fail($"ExceptionManager did not failed while it was expected to failed after {nbError}"); + } + } } diff --git a/Common/tests/Pollster/PollsterTest.cs b/Common/tests/Pollster/PollsterTest.cs index 40bf1463b..f9ea55cea 100644 --- a/Common/tests/Pollster/PollsterTest.cs +++ b/Common/tests/Pollster/PollsterTest.cs @@ -313,6 +313,8 @@ await testServiceProvider.Pollster.Init(CancellationToken.None) Assert.AreEqual(HealthStatus.Healthy, (await testServiceProvider.Pollster.Check(HealthCheckTag.Startup) .ConfigureAwait(false)).Status); + + testServiceProvider.AssertFailAfterError(6); } [Test] @@ -411,6 +413,8 @@ await testServiceProvider.Pollster.Init(CancellationToken.None) Assert.DoesNotThrowAsync(() => stop); Assert.AreEqual(Array.Empty(), testServiceProvider.Pollster.TaskProcessing); + + testServiceProvider.AssertFailAfterError(6); } public class WaitWorkerStreamHandler : IWorkerStreamHandler @@ -479,12 +483,119 @@ await testServiceProvider.Pollster.Init(CancellationToken.None) Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop()); Assert.DoesNotThrowAsync(() => stop); - Assert.False(testServiceProvider.ExceptionManager.Failed); Assert.AreEqual(TaskStatus.Completed, await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted, CancellationToken.None) .ConfigureAwait(false)); + + testServiceProvider.AssertFailAfterError(6); + } + + [Test] + [Timeout(10000)] + public async Task ExecuteTaskTimeoutAcquire() + { + var mockPullQueueStorage = new SimplePullQueueStorageChannel(); + var waitWorkerStreamHandler = new WaitWorkerStreamHandler(1000); + var simpleAgentHandler = new SimpleAgentHandler(); + + using var testServiceProvider = new TestPollsterProvider(waitWorkerStreamHandler, + simpleAgentHandler, + mockPullQueueStorage, + TimeSpan.FromMilliseconds(100), + TimeSpan.FromMilliseconds(100), + 0); + + var (sessionId, _, taskSubmitted) = await InitSubmitter(testServiceProvider.Submitter, + testServiceProvider.PartitionTable, + testServiceProvider.ResultTable, + testServiceProvider.SessionTable, + CancellationToken.None) + .ConfigureAwait(false); + + await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandler + { + CancellationToken = CancellationToken.None, + Status = QueueMessageStatus.Waiting, + MessageId = Guid.NewGuid() + .ToString(), + TaskId = taskSubmitted, + }) + .ConfigureAwait(false); + + var expectedOutput3 = "ExpectedOutput3"; + await testServiceProvider.ResultTable.Create(new[] + { + new Result(sessionId, + expectedOutput3, + "", + "", + "", + ResultStatus.Created, + new List(), + DateTime.UtcNow, + 0, + Array.Empty()), + }, + CancellationToken.None) + .ConfigureAwait(false); + + var requests = await testServiceProvider.Submitter.CreateTasks(sessionId, + sessionId, + new TaskOptions(), + new List + { + new(new[] + { + expectedOutput3, + }, + new List(), + new List> + { + new(Encoding.ASCII.GetBytes("AAAA")), + }.ToAsyncEnumerable()), + }.ToAsyncEnumerable(), + CancellationToken.None) + .ConfigureAwait(false); + + var sessionData = await testServiceProvider.SessionTable.GetSessionAsync(sessionId, + CancellationToken.None) + .ConfigureAwait(false); + + await testServiceProvider.Submitter.FinalizeTaskCreation(requests, + sessionData, + sessionId, + CancellationToken.None) + .ConfigureAwait(false); + + var taskSubmitted2 = requests.First() + .TaskId; + + await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandler + { + CancellationToken = CancellationToken.None, + Status = QueueMessageStatus.Waiting, + MessageId = Guid.NewGuid() + .ToString(), + TaskId = taskSubmitted2, + }) + .ConfigureAwait(false); + + await testServiceProvider.Pollster.Init(CancellationToken.None) + .ConfigureAwait(false); + + var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromSeconds(2)); + + Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop()); + Assert.DoesNotThrowAsync(() => stop); + + Assert.AreEqual(TaskStatus.Submitted, + await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted2, + CancellationToken.None) + .ConfigureAwait(false)); + + testServiceProvider.AssertFailAfterError(); } [Test] @@ -535,6 +646,8 @@ await Task.Delay(TimeSpan.FromMilliseconds(200), await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted, CancellationToken.None) .ConfigureAwait(false)); + + testServiceProvider.AssertFailAfterError(5); } [Test] @@ -593,7 +706,6 @@ await testServiceProvider.Pollster.StopCancelledTask() Assert.DoesNotThrowAsync(() => mainLoopTask); Assert.DoesNotThrowAsync(() => stop); - Assert.False(testServiceProvider.ExceptionManager.Failed); Assert.That(await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted, CancellationToken.None) @@ -603,6 +715,8 @@ await testServiceProvider.Pollster.StopCancelledTask() Assert.AreEqual(Array.Empty(), testServiceProvider.Pollster.TaskProcessing); + + testServiceProvider.AssertFailAfterError(5); } public static IEnumerable ExecuteTooManyErrorShouldFailTestCase @@ -733,5 +847,7 @@ await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted, .ConfigureAwait(false)); Assert.AreEqual(Array.Empty(), testServiceProvider.Pollster.TaskProcessing); + + testServiceProvider.AssertFailAfterError(5); } }