Skip to content

Commit

Permalink
Fix PollsterTest
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo committed Aug 6, 2024
1 parent 2cc4d2f commit 6d3d088
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 71 deletions.
10 changes: 1 addition & 9 deletions Common/src/Pollster/Pollster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ public Pollster(IPullQueueStorage pullQueueStorage,
meterHolder_ = meterHolder;
ownerPodId_ = identifier.OwnerPodId;
ownerPodName_ = identifier.OwnerPodName;
Failed = false;

var started = DateTime.UtcNow;
meterHolder_.Meter.CreateObservableCounter("uptime",
Expand Down Expand Up @@ -159,12 +158,6 @@ public Pollster(IPullQueueStorage pullQueueStorage,
public ICollection<string> TaskProcessing
=> taskProcessingDict_.Keys;

/// <summary>
/// Is true when the MainLoop exited with an error
/// Used in Unit tests
/// </summary>
public bool Failed { get; private set; }

public async Task Init(CancellationToken cancellationToken)
=> await Task.WhenAll(pullQueueStorage_.Init(cancellationToken),
dataPrefetcher_.Init(cancellationToken),
Expand Down Expand Up @@ -258,7 +251,7 @@ await taskHandler.StopCancelledTask()
}
}

public async Task MainLoop(CancellationToken cancellationToken)
public async Task MainLoop()
{
try
{
Expand Down Expand Up @@ -420,7 +413,6 @@ await runningTaskQueue_.WaitForNextWriteAsync(pollsterOptions_.TimeoutBeforeNext
}
catch (Exception e)
{
Failed = true;
exceptionManager_.FatalError(logger_,
e,
"Error in pollster");
Expand Down
13 changes: 11 additions & 2 deletions Common/src/Pollster/RunningTaskProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,17 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
var taskHandler = await runningTaskQueue_.ReadAsync(exceptionManager_.EarlyCancellationToken)
.ConfigureAwait(false);
TaskHandler taskHandler;
try
{
taskHandler = await runningTaskQueue_.ReadAsync(exceptionManager_.EarlyCancellationToken)
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}

await using var taskHandlerDispose = new Deferrer(taskHandler);

var taskInfo = taskHandler.GetAcquiredTaskInfo();
Expand Down
16 changes: 10 additions & 6 deletions Common/src/Utils/ExceptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public CancellationToken EarlyCancellationToken
public CancellationToken LateCancellationToken
=> lateCts_.Token;

public bool Failed { get; private set; }

public void Dispose()
{
foreach (var disposable in disposables_)
Expand Down Expand Up @@ -114,9 +116,9 @@ public void RecordError(ILogger? logger,
using var scope = logger.BeginScope("Exception #{NbError}/{MaxError}",
nbError,
maxError_);
Action<ILogger, Exception, string, object?[]> log = nbError <= maxError_
? LoggerExtensions.LogError
: LoggerExtensions.LogDebug;
Action<ILogger, Exception?, string, object?[]> log = nbError <= maxError_
? LoggerExtensions.LogError
: LoggerExtensions.LogDebug;

log.Invoke(logger,
e,
Expand All @@ -127,6 +129,7 @@ public void RecordError(ILogger? logger,
if (nbError == maxError_)
{
logger_?.LogCritical("Stop Application after too many errors");
Failed = true;
earlyCts_.Cancel();
}
}
Expand All @@ -144,16 +147,17 @@ public void FatalError(ILogger? logger,
if (logger is not null)
{
using var scope = logger.BeginScope("Fatal Exception");
Action<ILogger, Exception, string, object?[]> log = nbError <= maxError_
? LoggerExtensions.LogCritical
: LoggerExtensions.LogDebug;
Action<ILogger, Exception?, string, object?[]> log = nbError <= maxError_
? LoggerExtensions.LogCritical
: LoggerExtensions.LogDebug;

log.Invoke(logger,
e,
message,
args);
}

Failed = true;
earlyCts_.Cancel();
}

Expand Down
11 changes: 4 additions & 7 deletions Common/tests/Helpers/TestPollingAgentProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ public class TestPollingAgentProvider : IDisposable
private readonly WebApplication app_;
private readonly LoggerFactory loggerFactory_;

private readonly CancellationTokenSource pollsterCancellationTokenSource_ = new();
private readonly Task pollsterRunningTask_;
private readonly IMongoRunner runner_;
public readonly ISubmitter Submitter;
private readonly Task pollsterRunningTask_;
private readonly IMongoRunner runner_;
public readonly ISubmitter Submitter;


public TestPollingAgentProvider(IWorkerStreamHandler workerStreamHandler)
Expand Down Expand Up @@ -131,16 +130,14 @@ public TestPollingAgentProvider(IWorkerStreamHandler workerStreamHandler)
sessionTable.Init(CancellationToken.None)
.Wait();

pollsterRunningTask_ = Task.Factory.StartNew(() => pollster.MainLoop(pollsterCancellationTokenSource_.Token),
pollsterRunningTask_ = Task.Factory.StartNew(() => pollster.MainLoop(),
TaskCreationOptions.LongRunning);
}

public void Dispose()
{
pollsterCancellationTokenSource_?.Cancel(false);
pollsterRunningTask_?.Wait();
pollsterRunningTask_?.Dispose();
pollsterCancellationTokenSource_?.Dispose();
(app_ as IDisposable)?.Dispose();
loggerFactory_?.Dispose();
runner_?.Dispose();
Expand Down
60 changes: 40 additions & 20 deletions Common/tests/Helpers/TestPollsterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

using ArmoniK.Api.Common.Options;
using ArmoniK.Core.Adapters.Memory;
Expand Down Expand Up @@ -50,19 +52,25 @@ namespace ArmoniK.Core.Common.Tests.Helpers;

public class TestPollsterProvider : IDisposable
{
private const string DatabaseName = "ArmoniK_TestDB";
private static readonly ActivitySource ActivitySource = new("ArmoniK.Core.Common.Tests.TestPollsterProvider");
private readonly WebApplication app_;
private readonly IMongoClient client_;
private readonly TimeSpan? graceDelay_;
private readonly IObjectStorage objectStorage_;
public readonly IPartitionTable PartitionTable;
public readonly Common.Pollster.Pollster Pollster;
public readonly IResultTable ResultTable;
private readonly IMongoRunner runner_;
private readonly ISessionTable sessionTable_;
public readonly ISubmitter Submitter;
public readonly ITaskTable TaskTable;
private const string DatabaseName = "ArmoniK_TestDB";
private static readonly ActivitySource ActivitySource = new("ArmoniK.Core.Common.Tests.TestPollsterProvider");
private readonly WebApplication app_;
private readonly IMongoClient client_;

[SuppressMessage("Usage",
"CA2213: Disposable fields must be disposed")]
public readonly ExceptionManager ExceptionManager;

private readonly TimeSpan? graceDelay_;
public readonly IHostApplicationLifetime Lifetime;
private readonly IObjectStorage objectStorage_;
public readonly IPartitionTable PartitionTable;
public readonly Common.Pollster.Pollster Pollster;
public readonly IResultTable ResultTable;
private readonly IMongoRunner runner_;
private readonly ISessionTable sessionTable_;
public readonly ISubmitter Submitter;
public readonly ITaskTable TaskTable;


public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler,
Expand Down Expand Up @@ -178,13 +186,15 @@ public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler,
app_ = builder.Build();
app_.Start();

ResultTable = app_.Services.GetRequiredService<IResultTable>();
TaskTable = app_.Services.GetRequiredService<ITaskTable>();
PartitionTable = app_.Services.GetRequiredService<IPartitionTable>();
sessionTable_ = app_.Services.GetRequiredService<ISessionTable>();
Submitter = app_.Services.GetRequiredService<ISubmitter>();
Pollster = app_.Services.GetRequiredService<Common.Pollster.Pollster>();
objectStorage_ = app_.Services.GetRequiredService<IObjectStorage>();
ResultTable = app_.Services.GetRequiredService<IResultTable>();
TaskTable = app_.Services.GetRequiredService<ITaskTable>();
PartitionTable = app_.Services.GetRequiredService<IPartitionTable>();
sessionTable_ = app_.Services.GetRequiredService<ISessionTable>();
Submitter = app_.Services.GetRequiredService<ISubmitter>();
Pollster = app_.Services.GetRequiredService<Common.Pollster.Pollster>();
objectStorage_ = app_.Services.GetRequiredService<IObjectStorage>();
ExceptionManager = app_.Services.GetRequiredService<ExceptionManager>();
Lifetime = app_.Lifetime;

ResultTable.Init(CancellationToken.None)
.Wait();
Expand All @@ -206,4 +216,14 @@ public void Dispose()
runner_?.Dispose();
GC.SuppressFinalize(this);
}

public Task StopApplicationAfter(TimeSpan delay = default)
=> Task.Run(async () =>
{
await Task.Delay(delay,
ExceptionManager.EarlyCancellationToken)
.ConfigureAwait(false);

Lifetime.StopApplication();
});
}
51 changes: 25 additions & 26 deletions Common/tests/Pollster/PollsterTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,9 @@ await testServiceProvider.Pollster.Init(CancellationToken.None)
.ConfigureAwait(false)).Status);

// This test that we return from the mainloop after the health check is unhealthy
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await testServiceProvider.Pollster.MainLoop(cancellationTokenSource.Token)
await testServiceProvider.Pollster.MainLoop()
.ConfigureAwait(false);
Assert.False(testServiceProvider.Pollster.Failed);
Assert.IsFalse(cancellationTokenSource.IsCancellationRequested);
Assert.True(testServiceProvider.ExceptionManager.Failed);
}

[Test]
Expand Down Expand Up @@ -400,10 +398,10 @@ public async Task RunThenCancelPollster()
await testServiceProvider.Pollster.Init(CancellationToken.None)
.ConfigureAwait(false);

var source = new CancellationTokenSource(TimeSpan.FromMilliseconds(105));
var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromMicroseconds(105));

Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop(source.Token));
Assert.True(source.Token.IsCancellationRequested);
Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop());
Assert.DoesNotThrowAsync(() => stop);
Assert.AreEqual(Array.Empty<string>(),
testServiceProvider.Pollster.TaskProcessing);
}
Expand Down Expand Up @@ -469,11 +467,11 @@ await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandl
await testServiceProvider.Pollster.Init(CancellationToken.None)
.ConfigureAwait(false);

var source = new CancellationTokenSource(TimeSpan.FromMilliseconds(1000));
var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromSeconds(1));

Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop(source.Token));
Assert.False(testServiceProvider.Pollster.Failed);
Assert.True(source.Token.IsCancellationRequested);
Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop());
Assert.DoesNotThrowAsync(() => stop);
Assert.False(testServiceProvider.ExceptionManager.Failed);

Assert.AreEqual(TaskStatus.Completed,
await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted,
Expand Down Expand Up @@ -513,11 +511,11 @@ await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandl
await testServiceProvider.Pollster.Init(CancellationToken.None)
.ConfigureAwait(false);

var source = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromMilliseconds(300));

Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop(source.Token));
Assert.False(testServiceProvider.Pollster.Failed);
Assert.True(source.Token.IsCancellationRequested);
Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop());
Assert.DoesNotThrowAsync(() => stop);
Assert.False(testServiceProvider.ExceptionManager.Failed);

// wait to exceed grace delay and see that task is properly resubmitted
await Task.Delay(TimeSpan.FromMilliseconds(200),
Expand Down Expand Up @@ -561,9 +559,9 @@ await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandl
await testServiceProvider.Pollster.Init(CancellationToken.None)
.ConfigureAwait(false);

var source = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromSeconds(5));

var mainLoopTask = testServiceProvider.Pollster.MainLoop(source.Token);
var mainLoopTask = testServiceProvider.Pollster.MainLoop();

await Task.Delay(TimeSpan.FromMilliseconds(200),
CancellationToken.None)
Expand All @@ -584,8 +582,8 @@ await testServiceProvider.Pollster.StopCancelledTask()
.ConfigureAwait(false);

Assert.DoesNotThrowAsync(() => mainLoopTask);
Assert.False(testServiceProvider.Pollster.Failed);
Assert.True(source.Token.IsCancellationRequested);
Assert.DoesNotThrowAsync(() => stop);
Assert.False(testServiceProvider.ExceptionManager.Failed);

Assert.That(await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted,
CancellationToken.None)
Expand Down Expand Up @@ -664,12 +662,12 @@ public async Task ExecuteTooManyErrorShouldFail(Mock<IWorkerStreamHandler> mockS
await pollster.Init(CancellationToken.None)
.ConfigureAwait(false);

var source = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromSeconds(10));


Assert.DoesNotThrowAsync(() => pollster.MainLoop(source.Token));
Assert.True(pollster.Failed);
Assert.False(source.Token.IsCancellationRequested);
Assert.DoesNotThrowAsync(() => pollster.MainLoop());
Assert.That(() => stop,
Throws.InstanceOf<OperationCanceledException>());
Assert.True(testServiceProvider.ExceptionManager.Failed);
Assert.AreEqual(Array.Empty<string>(),
testServiceProvider.Pollster.TaskProcessing);
}
Expand Down Expand Up @@ -713,9 +711,10 @@ await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandl
await testServiceProvider.Pollster.Init(CancellationToken.None)
.ConfigureAwait(false);

var source = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromMilliseconds(300));

Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop(source.Token));
Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop());
Assert.DoesNotThrowAsync(() => stop);

Assert.AreEqual(TaskStatus.Submitted,
await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted,
Expand Down
2 changes: 1 addition & 1 deletion Compute/PollingAgent/src/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ public Worker(Pollster pollster)
=> pollster_ = pollster;

protected override Task ExecuteAsync(CancellationToken stoppingToken)
=> pollster_.MainLoop(stoppingToken);
=> pollster_.MainLoop();
}

0 comments on commit 6d3d088

Please sign in to comment.