Skip to content

Commit

Permalink
Added TransportConfiguration to allow transport specific configuratio…
Browse files Browse the repository at this point in the history
…ns on the session to occur.

Reverted the github actions back to default.
  • Loading branch information
DJGosnell committed Aug 24, 2023
1 parent 028c910 commit 8d3bf43
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dotnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
run: dotnet build src -c Release

- name: Execute integration tests
run: dotnet test src/NexNet.IntegrationTests -c Release --no-build --filter "FullyQualifiedName=NexNet.IntegrationTests.NexusServerTests_NexusDuplexPipe.PipeReaderReceivesDataMultipleTimes"
run: dotnet test src/NexNet.IntegrationTests -c Release --no-build -v=normal

- name: Execute generator tests
run: dotnet test src/NexNet.Generator.Tests -c Release --no-build -v=normal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public async Task PipeReaderReceivesDataMultipleTimes(Type type)
int count = 0;

// Ensure that the ids will properly wrap around.
const int iterations = 20000;
const int iterations = 1000;
sNexus.ServerTaskValueWithDuplexPipeEvent = async (nexus, pipe) =>
{
var result = await pipe.Input.ReadAsync();
Expand Down
26 changes: 21 additions & 5 deletions src/NexNet/Internals/NexusSession.Sending.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,18 @@ public async ValueTask SendMessage<TMessage>(TMessage body, CancellationToken ca

Logger?.LogTrace($"Sending {TMessage.Type} header and body with {length} total bytes.");

var result = await _pipeOutput.FlushAsync().ConfigureAwait(false);
FlushResult result = default;
try
{
// ReSharper disable once MethodSupportsCancellation
result = _configDoNotPassFlushCancellationToken
? await _pipeOutput.FlushAsync().ConfigureAwait(false)
: await _pipeOutput.FlushAsync(cancellationToken).ConfigureAwait(false);
}
catch (ObjectDisposedException)
{

}

OnSent?.Invoke();

Expand Down Expand Up @@ -135,7 +146,10 @@ public async ValueTask SendHeaderWithBody(MessageType type, ReadOnlyMemory<byte>
FlushResult result = default;
try
{
result = await _pipeOutput.FlushAsync().ConfigureAwait(false);
// ReSharper disable once MethodSupportsCancellation
result = _configDoNotPassFlushCancellationToken
? await _pipeOutput.FlushAsync().ConfigureAwait(false)
: await _pipeOutput.FlushAsync(cancellationToken).ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
Expand Down Expand Up @@ -202,12 +216,14 @@ private async ValueTask SendHeaderCore(MessageType type, bool force, Cancellatio
_pipeOutput.Advance(1);

Logger?.LogTrace($"Sending {type} header.");

FlushResult result = default;
try
{
// If the cancellation token is canceled after the flush has completed, QUIC throws sometimes.
// https://github.com/dotnet/runtime/issues/82704
result = await _pipeOutput.FlushAsync().ConfigureAwait(false);
// ReSharper disable once MethodSupportsCancellation
result = _configDoNotPassFlushCancellationToken
? await _pipeOutput.FlushAsync().ConfigureAwait(false)
: await _pipeOutput.FlushAsync(cancellationToken).ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
Expand Down
11 changes: 10 additions & 1 deletion src/NexNet/Internals/NexusSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,15 @@ private record struct ProcessResult(

private readonly TaskCompletionSource? _readyTaskCompletionSource;
private readonly TaskCompletionSource? _disconnectedTaskCompletionSource;
private volatile int _state;
private volatile int _state;

/// <summary>
/// If true, the transport will not pass the flush cancellation token to the underlying transport.
/// Currently exists due to an issue in the QUIC implementation. Should be removed once the issue is fixed.
/// https://github.com/dotnet/runtime/issues/82704
/// https://github.com/dotnet/runtime/pull/90253
/// </summary>
private readonly bool _configDoNotPassFlushCancellationToken;

public NexusPipeManager PipeManager { get; }

Expand Down Expand Up @@ -87,6 +95,7 @@ public NexusSession(in NexusSessionConfigurations<TNexus, TProxy> configurations
_pipeInput = configurations.Transport.Input;
_pipeOutput = configurations.Transport.Output;
_transportConnection = configurations.Transport;
_configDoNotPassFlushCancellationToken = _transportConnection.Configurations.DoNotPassFlushCancellationToken;
Config = _config = configurations.Configs;
_readyTaskCompletionSource = configurations.ReadyTaskCompletionSource;
_disconnectedTaskCompletionSource = configurations.DisconnectedTaskCompletionSource;
Expand Down
5 changes: 5 additions & 0 deletions src/NexNet/Transports/ITransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ namespace NexNet.Transports;
/// </summary>
public interface ITransport : IDuplexPipe
{
/// <summary>
/// The configuration used to create this transport.
/// </summary>
public TransportConfiguration Configurations { get; }

/// <summary>
/// Closes the connection.
/// </summary>
Expand Down
9 changes: 8 additions & 1 deletion src/NexNet/Transports/QuicTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ namespace NexNet.Transports;

internal class QuicTransport : ITransport
{

private readonly QuicConnection _quicConnection;

private readonly QuicStream _quicStream;
private readonly QuicStream _quicStream;

public PipeReader Input { get; }
public PipeWriter Output { get; }
Expand All @@ -28,6 +29,12 @@ private QuicTransport(QuicConnection quicConnection, QuicStream stream)
Input = PipeReader.Create(stream);
Output = PipeWriter.Create(stream);
}

public TransportConfiguration Configurations => new TransportConfiguration()
{
DoNotPassFlushCancellationToken = true
};

public ValueTask CloseAsync(bool linger)
{
return _quicConnection.CloseAsync(0);
Expand Down
2 changes: 2 additions & 0 deletions src/NexNet/Transports/SocketTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ private SocketTransport(SocketConnection socketConnection)
Output = socketConnection.Output;
}

public TransportConfiguration Configurations => new TransportConfiguration();

public ValueTask CloseAsync(bool linger)
{
if (!linger)
Expand Down
2 changes: 2 additions & 0 deletions src/NexNet/Transports/TcpTlsTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ private TcpTlsTransport(Socket socket, NetworkStream networkStream, SslStream ss
Output = PipeWriter.Create(sslStream);
}

public TransportConfiguration Configurations => new TransportConfiguration();

public ValueTask CloseAsync(bool linger)
{
if (!linger)
Expand Down
17 changes: 17 additions & 0 deletions src/NexNet/Transports/TransportConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace NexNet.Transports;

/// <summary>
/// Contains configuration options for a transport.
/// </summary>
public class TransportConfiguration
{
/// <summary>
/// If true, the transport will not pass the flush cancellation token to the underlying transport.
/// Currently exists due to an issue in the QUIC implementation. Should be removed once the issue is fixed.
/// </summary>
/// <remarks>
/// https://github.com/dotnet/runtime/issues/82704
/// https://github.com/dotnet/runtime/pull/90253
/// </remarks>
public bool DoNotPassFlushCancellationToken { get; init; } = false;
}

0 comments on commit 8d3bf43

Please sign in to comment.