From f05dd00195abbb4cc267a8e28436316adff5624d Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Wed, 21 Aug 2024 11:04:23 +0200 Subject: [PATCH] refactor tests --- Common/tests/RendezvousChannelTests.cs | 406 +++++++++++-------------- 1 file changed, 177 insertions(+), 229 deletions(-) diff --git a/Common/tests/RendezvousChannelTests.cs b/Common/tests/RendezvousChannelTests.cs index 1c788c47c..a76c1ab7b 100644 --- a/Common/tests/RendezvousChannelTests.cs +++ b/Common/tests/RendezvousChannelTests.cs @@ -17,7 +17,6 @@ using System; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Channels; @@ -33,7 +32,7 @@ namespace ArmoniK.Core.Common.Tests; public class RendezvousChannelTest { [Test] - [Timeout(1000)] + [Timeout(10000)] [Repeat(1000)] public async Task WriteShouldWork([Values(0, 1, @@ -76,183 +75,155 @@ public async Task WriteShouldWork([Values(0, } [Test] - [Timeout(1000)] - [Repeat(10)] + [Timeout(10000)] + [Repeat(20)] public void TimeoutNoCounterParty([Values(0, 15, - 30, 45)] int timeout, - [Values] bool isReader) + [Values] bool isReader, + [Values] bool useCancellation) { - // Warmup Nunit runtime - Assert.That(() => Task.Delay(0), - Throws.Nothing); - var queue = new RendezvousChannel(); - var sw = Stopwatch.StartNew(); + var t0 = DateTime.UtcNow; + + Assert.That(() => Interact(queue, + isReader, + useCancellation, + timeout), + Throws.InstanceOf(ExceptionType(useCancellation))); - Assert.That(() => isReader - ? queue.ReadAsync(TimeSpan.FromMilliseconds(timeout)) - : queue.WriteAsync(0, - TimeSpan.FromMilliseconds(timeout)), - Throws.InstanceOf()); + var t1 = DateTime.UtcNow; + var elapsed = (t1 - t0).TotalMilliseconds; - sw.Stop(); + Assert.That(() => Interact(queue, + !isReader), + Throws.InstanceOf(ExceptionType())); - Assert.That(sw.Elapsed.TotalMilliseconds, - Is.GreaterThanOrEqualTo(timeout - 5)); - Assert.That(sw.Elapsed.TotalMilliseconds, - Is.LessThanOrEqualTo(timeout + 20)); + Console.WriteLine($"Timeout {timeout,7} ms got {elapsed,7} ms"); + + Assert.That(elapsed, + Is.GreaterThanOrEqualTo(timeout / 2)); + Assert.That(elapsed, + Is.LessThanOrEqualTo(timeout * 3 + 50)); } + [Test] [Timeout(1000)] - [Repeat(20)] - public async Task Timeout([Values(0, - 15, - 45)] - int timeout, - [Values(0, - 15, - 45)] - int wait, - [Values] bool isReader, - [Values] bool useCancellation) + [Repeat(200)] + public void TimeoutCounterParty([Values] bool isReader, + [Values] bool useCancellation) { var queue = new RendezvousChannel(); - var actor = new QueueActor(useCancellation, - timeout, - wait, - new TaskCompletionSource()); - - var readerTask = RunWithStopWatch(() => actor.Interact(isReader, - (timeSpan, - token) => queue.ReadAsync(timeSpan, - token), - queue.CloseReader)); - var writerTask = RunWithStopWatch(() => actor.Interact(!isReader, - (timeSpan, - token) => TaskWithValue(queue.WriteAsync(3, - timeSpan, - token)), - queue.CloseWriter)); - - var reader = await readerTask.ConfigureAwait(false); - var writer = await writerTask.ConfigureAwait(false); - - var readExceptionType = actor.ExceptionType(isReader); - var writeExceptionType = actor.ExceptionType(!isReader); - - Console.WriteLine($"Read ({queue.IsReaderClosed,-5}): {reader.duration.TotalMilliseconds,-7} ms Write ({queue.IsWriterClosed,-5}): {writer.duration.TotalMilliseconds,-7} ms"); - - switch (timeout - wait) - { - case < 0: - Assert.That(() => reader.task, - Throws.InstanceOf(readExceptionType)); - Assert.That(() => writer.task, - Throws.InstanceOf(writeExceptionType)); - break; - case > 0: - Assert.That(() => reader.task, - Throws.Nothing); - Assert.That(() => writer.task, - Throws.Nothing); - Assert.That(await reader.task.ConfigureAwait(false), - Is.EqualTo(3)); - break; - default: - var readException = await WaitException(reader.task) - .ConfigureAwait(false); - var writeException = await WaitException(writer.task) - .ConfigureAwait(false); - - // Ensure that either both succeed or both fail - Assert.That(writeException, - readException is null - ? Is.Null - : Is.InstanceOf(writeExceptionType)); - Assert.That(readException, - writeException is null - ? Is.Null - : Is.InstanceOf(readExceptionType)); - break; - } - } + var task = Interact(queue, + isReader, + useCancellation, + 100); + + Assert.That(() => Interact(queue, + !isReader), + Throws.Nothing); + Assert.That(() => task, + Throws.Nothing); + Assert.That(task.Result, + Is.EqualTo(3)); + } [Test] [Timeout(1000)] - [Repeat(20)] - public async Task TimeoutClose([Values(0, - 15, - 45)] - int timeout, - [Values(0, - 15, - 45)] - int wait, - [Values] bool isReader, - [Values] bool useCancellation) + [Repeat(200)] + public void TimeoutClose([Values] bool isReader, + [Values] bool useCancellation) + { + var queue = new RendezvousChannel(); + + var task = Interact(queue, + isReader, + useCancellation, + 100); + + Assert.That(() => Close(queue, + !isReader), + Throws.Nothing); + Assert.That(() => task, + Throws.InstanceOf()); + Assert.That(() => Interact(queue, + isReader, + timeout: 100), + Throws.InstanceOf()); + } + + [Test] + [Timeout(10000)] + [Repeat(200)] + public async Task TimeoutRaceCounterParty([Values] bool isReader, + [Values] bool useCancellation) { var queue = new RendezvousChannel(); - var actor = new QueueActor(useCancellation, - timeout, - wait); - - var readerTask = RunWithStopWatch(() => actor.Interact(isReader, - (timeSpan, - token) => queue.ReadAsync(timeSpan, - token), - queue.CloseReader, - () => - { - queue.CloseReader(); - return Task.FromResult(0); - })); - var writerTask = RunWithStopWatch(() => actor.Interact(!isReader, - (timeSpan, - token) => TaskWithValue(queue.WriteAsync(3, - timeSpan, - token)), - queue.CloseWriter, - () => - { - queue.CloseWriter(); - return Task.FromResult(new ValueTuple()); - })); - - var reader = await readerTask.ConfigureAwait(false); - var writer = await writerTask.ConfigureAwait(false); - - Console.WriteLine($"Read ({queue.IsReaderClosed,-5}): {reader.duration.TotalMilliseconds,-7} ms Write ({queue.IsWriterClosed,-5}): {writer.duration.TotalMilliseconds,-7} ms"); - - Task task = isReader - ? reader.task - : writer.task; - - var exceptionType = actor.ExceptionType(); - - switch (timeout - wait) + + var waiter = Interact(queue, + isReader, + useCancellation, + 30, + true); + + var triggerer = Interact(queue, + !isReader, + close: true); + + var waitException = await WaitException(waiter) + .ConfigureAwait(false); + var triggerException = await WaitException(triggerer) + .ConfigureAwait(false); + + Assert.That(waitException, + triggerException is null + ? Is.Null + : Is.InstanceOf(ExceptionType(useCancellation))); + Assert.That(triggerException, + waitException is null + ? Is.Null + : Is.InstanceOf(ExceptionType())); + + if (waitException is null) { - case < 0: - Assert.That(() => task, - Throws.InstanceOf(exceptionType)); - break; - case > 0: - Assert.That(() => task, - Throws.InstanceOf()); - break; - default: - Assert.That(() => task, - Throws.InstanceOf() - .Or.InstanceOf(exceptionType)); - break; + Assert.That(waiter.Result, + Is.EqualTo(3)); + Assert.That(triggerer.Result, + Is.EqualTo(3)); } } + [Test] + [Timeout(10000)] + [Repeat(50)] + public async Task TimeoutRaceClose([Values] bool isReader, + [Values] bool useCancellation) + { + var queue = new RendezvousChannel(); + + var waiter = Interact(queue, + isReader, + useCancellation, + 30); + + await Task.Delay(30) + .ConfigureAwait(false); + Assert.That(() => Close(queue, + !isReader), + Throws.Nothing); + Assert.That(() => waiter, + Throws.InstanceOf() + .Or.InstanceOf(ExceptionType(useCancellation))); + Assert.That(() => Interact(queue, + isReader, + timeout: 100), + Throws.InstanceOf()); + } + private static async IAsyncEnumerable ReadAsync(RendezvousChannel queue, int closeAfter) { @@ -262,7 +233,7 @@ private static async IAsyncEnumerable ReadAsync(RendezvousChannel queu int x; try { - x = await queue.ReadAsync(System.Threading.Timeout.InfiniteTimeSpan, + x = await queue.ReadAsync(Timeout.InfiniteTimeSpan, CancellationToken.None) .ConfigureAwait(false); } @@ -288,7 +259,7 @@ private static async Task WriteAsync(RendezvousChannel queue, try { await queue.WriteAsync(nbWritten, - System.Threading.Timeout.InfiniteTimeSpan, + Timeout.InfiniteTimeSpan, CancellationToken.None) .ConfigureAwait(false); nbWritten++; @@ -304,31 +275,6 @@ await queue.WriteAsync(nbWritten, return nbWritten; } - private static Task<(Task task, TimeSpan duration)> RunWithStopWatch(Func> f) - => Task.Run(async () => - { - var sw = Stopwatch.StartNew(); - var task = f(); - try - { - await task.ConfigureAwait(false); - } - catch - { - // ignored - } - - sw.Stop(); - - return (task, sw.Elapsed); - }); - - private async Task TaskWithValue(Task task) - { - await task.ConfigureAwait(false); - return new ValueTuple(); - } - private static async Task WaitException(Task task) { try @@ -342,62 +288,64 @@ private async Task TaskWithValue(Task task) } } - private class QueueActor(bool useCancellation, - int timeout, - int wait, - TaskCompletionSource? tcs = null) + private static async Task Interact(RendezvousChannel queue, + bool isReader, + bool useCancellation = false, + int timeout = 0, + bool close = false) { - public async Task Interact(bool isActive, - Func> interact, - Action close, - Func>? interactPassive = null) - { - using var cts = new CancellationTokenSource(); - Task task; - if (isActive) - { - if (useCancellation) - { - cts.CancelAfter(timeout); - task = interact(System.Threading.Timeout.InfiniteTimeSpan, - cts.Token); - } - else - { - task = interact(TimeSpan.FromMilliseconds(timeout), - CancellationToken.None); - } - - // Ensure counterparty starts after the interaction has begun - tcs?.SetResult(); - } - else - { - if (tcs is not null) - { - await tcs.Task.ConfigureAwait(false); - } + using var cts = useCancellation + ? new CancellationTokenSource(timeout) + : null; + var token = CancellationToken.None; + var span = TimeSpan.FromMilliseconds(timeout); - await Task.Delay(wait, - CancellationToken.None) - .ConfigureAwait(false); + if (cts is not null) + { + span = Timeout.InfiniteTimeSpan; + token = cts.Token; + } - interactPassive ??= () => interact(TimeSpan.Zero, - CancellationToken.None); + var x = 3; - task = interactPassive(); - } + if (isReader) + { + x = await queue.ReadAsync(span, + token) + .ConfigureAwait(false); + } + else + { + await queue.WriteAsync(x, + span, + token) + .ConfigureAwait(false); + } - var x = await task.ConfigureAwait(false); + if (close) + { + Close(queue, + isReader); + } - close(); + return x; + } - return x; + private static void Close(RendezvousChannel queue, + bool isReader) + { + if (isReader) + { + queue.CloseReader(); + } + else + { + queue.CloseWriter(); } - - public Type ExceptionType(bool isActive = true) - => isActive && useCancellation - ? typeof(OperationCanceledException) - : typeof(TimeoutException); } + + private static Type ExceptionType(bool useCancellation = false) + => useCancellation + ? typeof(OperationCanceledException) + : typeof(TimeoutException); }