From 4c5b5f7cf3e16fdf812e9a266da47a4d09236a56 Mon Sep 17 00:00:00 2001 From: Florian Lemaitre Date: Wed, 21 Aug 2024 01:01:08 +0200 Subject: [PATCH] Refactor tests --- Common/tests/RendezvousChannelTests.cs | 266 +++++++++++-------------- 1 file changed, 113 insertions(+), 153 deletions(-) diff --git a/Common/tests/RendezvousChannelTests.cs b/Common/tests/RendezvousChannelTests.cs index f62567ce3..1c788c47c 100644 --- a/Common/tests/RendezvousChannelTests.cs +++ b/Common/tests/RendezvousChannelTests.cs @@ -122,96 +122,28 @@ public async Task Timeout([Values(0, [Values] bool useCancellation) { var queue = new RendezvousChannel(); - - var tcs = new TaskCompletionSource(); - using var cts = new CancellationTokenSource(); - - var readerTask = RunWithStopWatch(async () => - { - Task read; - - if (!isReader) - { - await tcs.Task.ConfigureAwait(false); - await Task.Delay(TimeSpan.FromMilliseconds(wait), - CancellationToken.None) - .ConfigureAwait(false); - read = queue.ReadAsync(TimeSpan.Zero, - CancellationToken.None); - } - else - { - if (useCancellation) - { - cts.CancelAfter(timeout); - read = queue.ReadAsync(System.Threading.Timeout.InfiniteTimeSpan, - cts.Token); - } - else - { - read = queue.ReadAsync(TimeSpan.FromMilliseconds(timeout), - CancellationToken.None); - } - - // Ensure write occurs after read has started - tcs.SetResult(); - } - - var x = await read.ConfigureAwait(false); - - queue.CloseReader(); - - return x; - }); - var writerTask = RunWithStopWatch(async () => - { - Task write; - - if (isReader) - { - await tcs.Task.ConfigureAwait(false); - await Task.Delay(TimeSpan.FromMilliseconds(wait), - CancellationToken.None) - .ConfigureAwait(false); - write = queue.WriteAsync(3, - TimeSpan.Zero, - CancellationToken.None); - } - else - { - if (useCancellation) - { - cts.CancelAfter(timeout); - write = queue.WriteAsync(3, - System.Threading.Timeout.InfiniteTimeSpan, - cts.Token); - } - else - { - write = queue.WriteAsync(3, - TimeSpan.FromMilliseconds(timeout), - CancellationToken.None); - } - - // Ensure read occurs after write has started - tcs.SetResult(); - } - - await write.ConfigureAwait(true); - queue.CloseWriter(); - - return new ValueTuple(); - }); + var actor = new QueueActor(useCancellation, + timeout, + wait, + new TaskCompletionSource()); + + var readerTask = RunWithStopWatch(() => actor.Interact(isReader, + (timeSpan, + token) => queue.ReadAsync(timeSpan, + token), + queue.CloseReader)); + var writerTask = RunWithStopWatch(() => actor.Interact(!isReader, + (timeSpan, + token) => TaskWithValue(queue.WriteAsync(3, + timeSpan, + token)), + queue.CloseWriter)); var reader = await readerTask.ConfigureAwait(false); var writer = await writerTask.ConfigureAwait(false); - var readExceptionType = isReader && useCancellation - ? typeof(OperationCanceledException) - : typeof(TimeoutException); - var writeExceptionType = !isReader && useCancellation - ? typeof(OperationCanceledException) - : typeof(TimeoutException); + var readExceptionType = actor.ExceptionType(isReader); + var writeExceptionType = actor.ExceptionType(!isReader); Console.WriteLine($"Read ({queue.IsReaderClosed,-5}): {reader.duration.TotalMilliseconds,-7} ms Write ({queue.IsWriterClosed,-5}): {writer.duration.TotalMilliseconds,-7} ms"); @@ -265,80 +197,43 @@ public async Task TimeoutClose([Values(0, [Values] bool isReader, [Values] bool useCancellation) { - var queue = new RendezvousChannel(); - using var cts = new CancellationTokenSource(); - - var readerTask = RunWithStopWatch(async () => - { - if (!isReader) - { - await Task.Delay(TimeSpan.FromMilliseconds(wait), - CancellationToken.None) - .ConfigureAwait(false); - queue.CloseReader(); - } - else - { - if (useCancellation) - { - cts.CancelAfter(timeout); - await queue.ReadAsync(System.Threading.Timeout.InfiniteTimeSpan, - cts.Token) - .ConfigureAwait(false); - } - else - { - await queue.ReadAsync(TimeSpan.FromMilliseconds(timeout), - CancellationToken.None) - .ConfigureAwait(false); - } - } - - return new ValueTuple(); - }); - var writerTask = RunWithStopWatch(async () => - { - if (isReader) - { - await Task.Delay(TimeSpan.FromMilliseconds(wait), - CancellationToken.None) - .ConfigureAwait(false); - queue.CloseWriter(); - } - else - { - if (useCancellation) - { - cts.CancelAfter(timeout); - await queue.WriteAsync(3, - System.Threading.Timeout.InfiniteTimeSpan, - cts.Token) - .ConfigureAwait(false); - } - else - { - await queue.WriteAsync(3, - TimeSpan.FromMilliseconds(timeout), - CancellationToken.None) - .ConfigureAwait(false); - } - } - - return new ValueTuple(); - }); + var queue = new RendezvousChannel(); + var actor = new QueueActor(useCancellation, + timeout, + wait); + + var readerTask = RunWithStopWatch(() => actor.Interact(isReader, + (timeSpan, + token) => queue.ReadAsync(timeSpan, + token), + queue.CloseReader, + () => + { + queue.CloseReader(); + return Task.FromResult(0); + })); + var writerTask = RunWithStopWatch(() => actor.Interact(!isReader, + (timeSpan, + token) => TaskWithValue(queue.WriteAsync(3, + timeSpan, + token)), + queue.CloseWriter, + () => + { + queue.CloseWriter(); + return Task.FromResult(new ValueTuple()); + })); var reader = await readerTask.ConfigureAwait(false); var writer = await writerTask.ConfigureAwait(false); Console.WriteLine($"Read ({queue.IsReaderClosed,-5}): {reader.duration.TotalMilliseconds,-7} ms Write ({queue.IsWriterClosed,-5}): {writer.duration.TotalMilliseconds,-7} ms"); - var task = isReader - ? reader.task - : writer.task; + Task task = isReader + ? reader.task + : writer.task; - var exceptionType = useCancellation - ? typeof(OperationCanceledException) - : typeof(TimeoutException); + var exceptionType = actor.ExceptionType(); switch (timeout - wait) { @@ -428,6 +323,12 @@ await queue.WriteAsync(nbWritten, return (task, sw.Elapsed); }); + private async Task TaskWithValue(Task task) + { + await task.ConfigureAwait(false); + return new ValueTuple(); + } + private static async Task WaitException(Task task) { try @@ -440,4 +341,63 @@ await queue.WriteAsync(nbWritten, return e; } } + + private class QueueActor(bool useCancellation, + int timeout, + int wait, + TaskCompletionSource? tcs = null) + { + public async Task Interact(bool isActive, + Func> interact, + Action close, + Func>? interactPassive = null) + { + using var cts = new CancellationTokenSource(); + Task task; + if (isActive) + { + if (useCancellation) + { + cts.CancelAfter(timeout); + task = interact(System.Threading.Timeout.InfiniteTimeSpan, + cts.Token); + } + else + { + task = interact(TimeSpan.FromMilliseconds(timeout), + CancellationToken.None); + } + + // Ensure counterparty starts after the interaction has begun + tcs?.SetResult(); + } + else + { + if (tcs is not null) + { + await tcs.Task.ConfigureAwait(false); + } + + await Task.Delay(wait, + CancellationToken.None) + .ConfigureAwait(false); + + interactPassive ??= () => interact(TimeSpan.Zero, + CancellationToken.None); + + task = interactPassive(); + } + + var x = await task.ConfigureAwait(false); + + close(); + + return x; + } + + public Type ExceptionType(bool isActive = true) + => isActive && useCancellation + ? typeof(OperationCanceledException) + : typeof(TimeoutException); + } }