Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe Notifications, Checks, & Extensions #31

Merged
merged 4 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ MaxWarmupIterationCount=3 MinIterationCount=3 MinWarmupIterationCount=1
## Method Invocation Table
Some methods are handled differently based upon the arguments passed and there are limitations placed upon the types of arguments which can be used together. Most of these incompatibilities handled with Diagnostic Errors provided by the `NexNet.Generator`. Below is a table which shows valid combinations of arguments and return values.

| | CancellationToken | INexusDuplexPipe | INexusChannel<T> | Args |
|--------------|-------------------|------------------|------------------|------|
| void | | | | X |
| ValueTask | X | | | X |
| ValueTask | | X | X | X |
| ValueTask<T> | X | | | X |
| | CancellationToken | INexusDuplexPipe | INexusChannel<T> | Args |
|--------------------|-------------------|------------------|------------------|------|
| void | | | | X |
| ValueTask | X | | | X |
| ValueTask | | X | X | X |
| ValueTask&lt;T&gt; | X | | | X |

Notes:
- `CancellationToken`s can't be combined with `NexusDuplexPipe` nor `INexusChannel<T>` due to pipes/channels having built-in cancellation/completion notifications.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public async Task ReaderCompletesOnPartialRead()
await reader.Reader.CompleteAsync();
});

var completeRead = await reader.ReadUntilComplete();
var completeRead = await reader.ReadUntilComplete().Timeout(1);

Assert.AreEqual(0, completeRead.Count);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public async Task Client_PipeReaderCompletesUponPipeCompleteAsync(Type type)
await pipe.ReadyTask.Timeout(1);
await pipe.CompleteAsync().Timeout(1);

await tcs.Task.Timeout(1).Timeout(1);
await tcs.Task.Timeout(1);
}

[TestCase(Type.Uds)]
Expand Down Expand Up @@ -349,7 +349,7 @@ public async Task Client_PipeReaderRemainsOpenUponOtherReaderCompletion(Type typ

await pipe.Output.WriteAsync(Data).Timeout(1);

await tcs.Task.Timeout(1).Timeout(1);
await tcs.Task.Timeout(1);
}

[TestCase(Type.Uds)]
Expand All @@ -369,7 +369,7 @@ public async Task Client_PipeNotifiesWhenReady(Type type)

await sNexus.Context.Clients.Caller.ClientTaskValueWithDuplexPipe(pipe).Timeout(1);

await pipe.ReadyTask.Timeout(1).Timeout(1);
await pipe.ReadyTask.Timeout(1);
}


Expand All @@ -395,4 +395,92 @@ public async Task Client_PipeReadyCancelsOnDisconnection(Type type)
await AssertThrows<TaskCanceledException>(async () => await pipe.ReadyTask).Timeout(2);
}

[TestCase(Type.Uds)]
[TestCase(Type.Tcp)]
[TestCase(Type.TcpTls)]
[TestCase(Type.Quic)]
public async Task Client_PipeCompleteCancelsOnDisconnection(Type type)
{
var (server, _, client, _, _) = await Setup(type);

var pipe = client.CreatePipe();

// Pause the receiving to test the cancellation
server.Config.InternalOnReceive = async (session, sequence) =>
{
await client.DisconnectAsync().Timeout(1);
await Task.Delay(100000);
};

await client.Proxy.ServerTaskValueWithDuplexPipe(pipe).Timeout(1);

await AssertThrows<TaskCanceledException>(async () => await pipe.CompleteTask).Timeout(2);
}

[TestCase(Type.Uds)]
[TestCase(Type.Tcp)]
[TestCase(Type.TcpTls)]
[TestCase(Type.Quic)]
public async Task Client_PipeNotifiesWhenComplete(Type type)
{
var (_, sNexus, client, _, _) = await Setup(type);

bool completedInvocation = false;
sNexus.ServerTaskValueWithDuplexPipeEvent = async (nexus, pipe) =>
{
await Task.Delay(100);
completedInvocation = true;
};

var pipe = client.CreatePipe();

await client.Proxy.ServerTaskValueWithDuplexPipe(pipe).Timeout(1);
await pipe.ReadyTask.Timeout(1);

await pipe.CompleteTask.Timeout(1);
Assert.IsTrue(completedInvocation);
}

[TestCase(Type.Uds)]
[TestCase(Type.Tcp)]
[TestCase(Type.TcpTls)]
[TestCase(Type.Quic)]
public async Task Client_ThrowsWhenPassingPipeFromWrongNexus(Type type)
{
var (_, sNexus, client, _, _) = await Setup(type);

sNexus.ServerTaskValueWithDuplexPipeEvent = async (nexus, pipe) =>
{
};

var pipe = sNexus.Context.CreatePipe();

await AssertThrows<InvalidOperationException>(() =>
client.Proxy.ServerTaskValueWithDuplexPipe(pipe).Timeout(1)).Timeout(1);
}


[TestCase(Type.Uds)]
[TestCase(Type.Tcp)]
[TestCase(Type.TcpTls)]
[TestCase(Type.Quic)]
public async Task Client_ThrowsWhenPassingUsedPipe(Type type)
{
var (_, sNexus, client, _, _) = await Setup(type);


sNexus.ServerTaskValueWithDuplexPipeEvent = async (nexus, pipe) =>
{
};

var pipe = client.CreatePipe();
await client.Proxy.ServerTaskValueWithDuplexPipe(pipe).Timeout(1);
await pipe.CompleteTask.Timeout(1);

await AssertThrows<InvalidOperationException>(() =>
client.Proxy.ServerTaskValueWithDuplexPipe(pipe).Timeout(1)).Timeout(1);
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ public async Task Server_PipeReaderReceivesDataMultipleTimes(Type type)
var (_, sNexus, _, cNexus, tcs) = await Setup(type);
var count = 0;

// TODO: Review adding a test for increased iterations as this has been found to sometimes fail on CI.
const int iterations = 10000;
sNexus.ServerTaskValueWithDuplexPipeEvent = async (nexus, pipe) =>
{
Expand Down Expand Up @@ -432,6 +431,53 @@ public async Task Server_PipeReadyCancelsOnDisconnection(Type type)
await AssertThrows<TaskCanceledException>(async () => await pipe.ReadyTask).Timeout(1);
}

[TestCase(Type.Uds)]
[TestCase(Type.Tcp)]
[TestCase(Type.TcpTls)]
[TestCase(Type.Quic)]
public async Task Server_PipeCompleteCancelsOnDisconnection(Type type)
{
var (_, sNexus, client, _, _) = await Setup(type);

var pipe = sNexus.Context.CreatePipe();

// Pause the receiving to test the cancellation
client.Config.InternalOnReceive = async (session, sequence) =>
{
await sNexus.Context.DisconnectAsync().Timeout(1);
await Task.Delay(100000);
};

await sNexus.Context.Clients.Caller.ClientTaskValueWithDuplexPipe(pipe).Timeout(1);

await AssertThrows<TaskCanceledException>(async () => await pipe.CompleteTask).Timeout(1);
}


[TestCase(Type.Uds)]
[TestCase(Type.Tcp)]
[TestCase(Type.TcpTls)]
[TestCase(Type.Quic)]
public async Task Server_PipeNotifiesWhenComplete(Type type)
{
var (_, sNexus, _, cNexus, _) = await Setup(type);

bool completedInvocation = false;
cNexus.ClientTaskValueWithDuplexPipeEvent = async (nexus, pipe) =>
{
await Task.Delay(100);
completedInvocation = true;
};

var pipe = sNexus.Context.CreatePipe();

await sNexus.Context.Clients.Caller.ClientTaskValueWithDuplexPipe(pipe).Timeout(1);
await pipe.ReadyTask.Timeout(1);

await pipe.CompleteTask.Timeout(1);
Assert.IsTrue(completedInvocation);
}

[Test]
public async Task Server_PipesThrowWhenInvokingOnMultipleConnections()
{
Expand Down Expand Up @@ -470,4 +516,44 @@ public async Task Server_PipesAllowInvocationOnSingleConnections()
await sNexus.Context.Clients.Client(1).ClientTaskValueWithDuplexPipe(sNexus.Context.CreatePipe()).Timeout(1);
await sNexus.Context.Clients.Caller.ClientTaskValueWithDuplexPipe(sNexus.Context.CreatePipe()).Timeout(1);
}

[TestCase(Type.Uds)]
[TestCase(Type.Tcp)]
[TestCase(Type.TcpTls)]
[TestCase(Type.Quic)]
public async Task Server_ThrowsWhenPassingPipeFromWrongNexus(Type type)
{
var (_, sNexus, _, cNexus, _) = await Setup(type);

cNexus.ClientTaskValueWithDuplexPipeEvent = async (nexus, pipe) =>
{
};

var pipe = cNexus.Context.CreatePipe();

await AssertThrows<InvalidOperationException>(() =>
sNexus.Context.Clients.Caller.ClientTaskValueWithDuplexPipe(pipe).Timeout(1)).Timeout(1);
}


[TestCase(Type.Uds)]
[TestCase(Type.Tcp)]
[TestCase(Type.TcpTls)]
[TestCase(Type.Quic)]
public async Task Server_ThrowsWhenPassingUsedPipe(Type type)
{
var (_, sNexus, _, cNexus, _) = await Setup(type);


cNexus.ClientTaskValueWithDuplexPipeEvent = async (nexus, pipe) =>
{
};

var pipe = sNexus.Context.CreatePipe();
await sNexus.Context.Clients.Caller.ClientTaskValueWithDuplexPipe(pipe).Timeout(1);
await pipe.CompleteTask.Timeout(1);

await AssertThrows<InvalidOperationException>(() =>
sNexus.Context.Clients.Caller.ClientTaskValueWithDuplexPipe(pipe).Timeout(1)).Timeout(1);
}
}
16 changes: 14 additions & 2 deletions src/NexNet/Invocation/ProxyInvocationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,22 @@ protected async ValueTask<TReturn> __ProxyInvokeAndWaitForResultCore<TReturn>(us
/// <param name="pipe">Pipe to retrieve the Id of.</param>
/// <returns>Initial id of the pipe.</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected static byte __ProxyGetDuplexPipeInitialId(INexusDuplexPipe? pipe)
protected byte __ProxyGetDuplexPipeInitialId(INexusDuplexPipe? pipe)
{
ArgumentNullException.ThrowIfNull(pipe);
return Unsafe.As<NexusDuplexPipe>(pipe).LocalId;
var nexusPipe = Unsafe.As<NexusDuplexPipe>(pipe);

if(nexusPipe.InitiatingPipe == false)
throw new InvalidOperationException(
"Pipe is not from initiating side of the invocation. Usually this means the proxy was passed a pipe which is already open on another invocation. Pipes can only be used once.");

if(this._session != nexusPipe.Session)
throw new InvalidOperationException("Passed pipe from non-initiating side of duplex pipe. Usually means that a server pipe was passed to a client proxy or vice versa.");

if(nexusPipe.CurrentState != NexusDuplexPipe.State.Unset)
throw new InvalidOperationException("Pipe is already open on another invocation. Pipes can only be used once.");

return nexusPipe.LocalId;
}


Expand Down
20 changes: 4 additions & 16 deletions src/NexNet/Pipes/INexusChannelReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,13 @@ public interface INexusChannelReader<T>
/// </summary>
long BufferedLength { get; }

/// <summary>
/// Asynchronously reads data from the duplex pipe.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns>
/// A task that represents the asynchronous read operation. The value of the TResult parameter contains an enumerable collection of type T.
/// If the read operation is completed or canceled, the returned task will contain an empty collection.
/// </returns>
ValueTask<IReadOnlyList<T>> ReadAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously reads data from the duplex pipe and converts it using the provided converter function.
/// </summary>
/// <typeparam name="TTo">The type of the items that will be returned after conversion.</typeparam>
/// <param name="converter">A function that converts each item of type T to type TTo.</param>
/// <param name="list">The list to which the converted items will be added.</param>
/// <param name="converter">An optional converter function that converts each item of type T to type TTo.</param>
/// <param name="cancellationToken">An optional token to cancel the read operation.</param>
/// <returns>
/// A task that represents the asynchronous read operation. The value of the TResult parameter contains an enumerable collection of type TTo.
/// If the read operation is completed or canceled, the returned task will contain an empty collection.
/// </returns>
ValueTask<IReadOnlyList<TTo>> ReadAsync<TTo>(Converter<T, TTo> converter, CancellationToken cancellationToken = default);
/// <returns>True if there is more data to be read from the channel, otherwise false when closed or completed.</returns>
ValueTask<bool> ReadAsync<TTo>(List<TTo> list, Converter<T, TTo>? converter, CancellationToken cancellationToken = default);
}
5 changes: 5 additions & 0 deletions src/NexNet/Pipes/INexusDuplexPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ public interface INexusDuplexPipe : IDuplexPipe
/// </summary>
Task ReadyTask { get; }

/// <summary>
/// Task which completes when the pipe has been completed on the invoking side.
/// </summary>
Task CompleteTask { get; }

/// <summary>
/// Sets the pipe to the complete state and closes the other end of the connection.
/// Do not use the pipe after calling this method.
Expand Down
Loading