Skip to content

Commit

Permalink
Refactor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo committed Aug 22, 2024
1 parent 75e3132 commit 4c5b5f7
Showing 1 changed file with 113 additions and 153 deletions.
266 changes: 113 additions & 153 deletions Common/tests/RendezvousChannelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,96 +122,28 @@ public async Task Timeout([Values(0,
[Values] bool useCancellation)
{
var queue = new RendezvousChannel<int>();

var tcs = new TaskCompletionSource();
using var cts = new CancellationTokenSource();

var readerTask = RunWithStopWatch(async () =>
{
Task<int> read;

if (!isReader)
{
await tcs.Task.ConfigureAwait(false);
await Task.Delay(TimeSpan.FromMilliseconds(wait),
CancellationToken.None)
.ConfigureAwait(false);
read = queue.ReadAsync(TimeSpan.Zero,
CancellationToken.None);
}
else
{
if (useCancellation)
{
cts.CancelAfter(timeout);
read = queue.ReadAsync(System.Threading.Timeout.InfiniteTimeSpan,
cts.Token);
}
else
{
read = queue.ReadAsync(TimeSpan.FromMilliseconds(timeout),
CancellationToken.None);
}

// Ensure write occurs after read has started
tcs.SetResult();
}

var x = await read.ConfigureAwait(false);

queue.CloseReader();

return x;
});
var writerTask = RunWithStopWatch(async () =>
{
Task write;

if (isReader)
{
await tcs.Task.ConfigureAwait(false);
await Task.Delay(TimeSpan.FromMilliseconds(wait),
CancellationToken.None)
.ConfigureAwait(false);
write = queue.WriteAsync(3,
TimeSpan.Zero,
CancellationToken.None);
}
else
{
if (useCancellation)
{
cts.CancelAfter(timeout);
write = queue.WriteAsync(3,
System.Threading.Timeout.InfiniteTimeSpan,
cts.Token);
}
else
{
write = queue.WriteAsync(3,
TimeSpan.FromMilliseconds(timeout),
CancellationToken.None);
}

// Ensure read occurs after write has started
tcs.SetResult();
}

await write.ConfigureAwait(true);
queue.CloseWriter();

return new ValueTuple();
});
var actor = new QueueActor(useCancellation,
timeout,
wait,
new TaskCompletionSource());

var readerTask = RunWithStopWatch(() => actor.Interact(isReader,
(timeSpan,
token) => queue.ReadAsync(timeSpan,
token),
queue.CloseReader));
var writerTask = RunWithStopWatch(() => actor.Interact(!isReader,
(timeSpan,
token) => TaskWithValue(queue.WriteAsync(3,
timeSpan,
token)),
queue.CloseWriter));

var reader = await readerTask.ConfigureAwait(false);
var writer = await writerTask.ConfigureAwait(false);

var readExceptionType = isReader && useCancellation
? typeof(OperationCanceledException)
: typeof(TimeoutException);
var writeExceptionType = !isReader && useCancellation
? typeof(OperationCanceledException)
: typeof(TimeoutException);
var readExceptionType = actor.ExceptionType(isReader);
var writeExceptionType = actor.ExceptionType(!isReader);

Console.WriteLine($"Read ({queue.IsReaderClosed,-5}): {reader.duration.TotalMilliseconds,-7} ms Write ({queue.IsWriterClosed,-5}): {writer.duration.TotalMilliseconds,-7} ms");

Expand Down Expand Up @@ -265,80 +197,43 @@ public async Task TimeoutClose([Values(0,
[Values] bool isReader,
[Values] bool useCancellation)
{
var queue = new RendezvousChannel<int>();
using var cts = new CancellationTokenSource();

var readerTask = RunWithStopWatch(async () =>
{
if (!isReader)
{
await Task.Delay(TimeSpan.FromMilliseconds(wait),
CancellationToken.None)
.ConfigureAwait(false);
queue.CloseReader();
}
else
{
if (useCancellation)
{
cts.CancelAfter(timeout);
await queue.ReadAsync(System.Threading.Timeout.InfiniteTimeSpan,
cts.Token)
.ConfigureAwait(false);
}
else
{
await queue.ReadAsync(TimeSpan.FromMilliseconds(timeout),
CancellationToken.None)
.ConfigureAwait(false);
}
}

return new ValueTuple();
});
var writerTask = RunWithStopWatch(async () =>
{
if (isReader)
{
await Task.Delay(TimeSpan.FromMilliseconds(wait),
CancellationToken.None)
.ConfigureAwait(false);
queue.CloseWriter();
}
else
{
if (useCancellation)
{
cts.CancelAfter(timeout);
await queue.WriteAsync(3,
System.Threading.Timeout.InfiniteTimeSpan,
cts.Token)
.ConfigureAwait(false);
}
else
{
await queue.WriteAsync(3,
TimeSpan.FromMilliseconds(timeout),
CancellationToken.None)
.ConfigureAwait(false);
}
}

return new ValueTuple();
});
var queue = new RendezvousChannel<int>();
var actor = new QueueActor(useCancellation,
timeout,
wait);

var readerTask = RunWithStopWatch(() => actor.Interact(isReader,
(timeSpan,
token) => queue.ReadAsync(timeSpan,
token),
queue.CloseReader,
() =>
{
queue.CloseReader();
return Task.FromResult(0);
}));
var writerTask = RunWithStopWatch(() => actor.Interact(!isReader,
(timeSpan,
token) => TaskWithValue(queue.WriteAsync(3,
timeSpan,
token)),
queue.CloseWriter,
() =>
{
queue.CloseWriter();
return Task.FromResult(new ValueTuple());
}));

var reader = await readerTask.ConfigureAwait(false);
var writer = await writerTask.ConfigureAwait(false);

Console.WriteLine($"Read ({queue.IsReaderClosed,-5}): {reader.duration.TotalMilliseconds,-7} ms Write ({queue.IsWriterClosed,-5}): {writer.duration.TotalMilliseconds,-7} ms");

var task = isReader
? reader.task
: writer.task;
Task task = isReader
? reader.task
: writer.task;

var exceptionType = useCancellation
? typeof(OperationCanceledException)
: typeof(TimeoutException);
var exceptionType = actor.ExceptionType();

switch (timeout - wait)
{
Expand Down Expand Up @@ -428,6 +323,12 @@ await queue.WriteAsync(nbWritten,
return (task, sw.Elapsed);
});

private async Task<ValueTuple> TaskWithValue(Task task)
{
await task.ConfigureAwait(false);
return new ValueTuple();
}

private static async Task<Exception?> WaitException(Task task)
{
try
Expand All @@ -440,4 +341,63 @@ await queue.WriteAsync(nbWritten,
return e;
}
}

private class QueueActor(bool useCancellation,
int timeout,
int wait,
TaskCompletionSource? tcs = null)
{
public async Task<T> Interact<T>(bool isActive,
Func<TimeSpan, CancellationToken, Task<T>> interact,
Action close,
Func<Task<T>>? interactPassive = null)
{
using var cts = new CancellationTokenSource();
Task<T> task;
if (isActive)
{
if (useCancellation)
{
cts.CancelAfter(timeout);
task = interact(System.Threading.Timeout.InfiniteTimeSpan,
cts.Token);
}
else
{
task = interact(TimeSpan.FromMilliseconds(timeout),
CancellationToken.None);
}

// Ensure counterparty starts after the interaction has begun
tcs?.SetResult();
}
else
{
if (tcs is not null)
{
await tcs.Task.ConfigureAwait(false);
}

await Task.Delay(wait,
CancellationToken.None)
.ConfigureAwait(false);

interactPassive ??= () => interact(TimeSpan.Zero,
CancellationToken.None);

task = interactPassive();
}

var x = await task.ConfigureAwait(false);

close();

return x;
}

public Type ExceptionType(bool isActive = true)
=> isActive && useCancellation
? typeof(OperationCanceledException)
: typeof(TimeoutException);
}
}

0 comments on commit 4c5b5f7

Please sign in to comment.