diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index e01afc49..eb8865f4 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -36,7 +36,7 @@ jobs: run: dotnet build src -c Release - name: Execute integration tests - run: dotnet test src/NexNet.IntegrationTests -c Release --no-build --filter "FullyQualifiedName=NexNet.IntegrationTests.NexusServerTests_NexusDuplexPipe.PipeReaderReceivesDataMultipleTimes" + run: dotnet test src/NexNet.IntegrationTests -c Release --no-build -v=normal - name: Execute generator tests run: dotnet test src/NexNet.Generator.Tests -c Release --no-build -v=normal diff --git a/src/NexNet.IntegrationTests/NexusServerTests_NexusDuplexPipe.cs b/src/NexNet.IntegrationTests/NexusServerTests_NexusDuplexPipe.cs index d12c4c07..b54c13e3 100644 --- a/src/NexNet.IntegrationTests/NexusServerTests_NexusDuplexPipe.cs +++ b/src/NexNet.IntegrationTests/NexusServerTests_NexusDuplexPipe.cs @@ -17,7 +17,7 @@ public async Task PipeReaderReceivesDataMultipleTimes(Type type) int count = 0; // Ensure that the ids will properly wrap around. - const int iterations = 20000; + const int iterations = 1000; sNexus.ServerTaskValueWithDuplexPipeEvent = async (nexus, pipe) => { var result = await pipe.Input.ReadAsync(); diff --git a/src/NexNet/Internals/NexusSession.Sending.cs b/src/NexNet/Internals/NexusSession.Sending.cs index 70c66f5b..162a99d8 100644 --- a/src/NexNet/Internals/NexusSession.Sending.cs +++ b/src/NexNet/Internals/NexusSession.Sending.cs @@ -63,7 +63,18 @@ public async ValueTask SendMessage(TMessage body, CancellationToken ca Logger?.LogTrace($"Sending {TMessage.Type} header and body with {length} total bytes."); - var result = await _pipeOutput.FlushAsync().ConfigureAwait(false); + FlushResult result = default; + try + { + // ReSharper disable once MethodSupportsCancellation + result = _configDoNotPassFlushCancellationToken + ? await _pipeOutput.FlushAsync().ConfigureAwait(false) + : await _pipeOutput.FlushAsync(cancellationToken).ConfigureAwait(false); + } + catch (ObjectDisposedException) + { + + } OnSent?.Invoke(); @@ -135,7 +146,10 @@ public async ValueTask SendHeaderWithBody(MessageType type, ReadOnlyMemory FlushResult result = default; try { - result = await _pipeOutput.FlushAsync().ConfigureAwait(false); + // ReSharper disable once MethodSupportsCancellation + result = _configDoNotPassFlushCancellationToken + ? await _pipeOutput.FlushAsync().ConfigureAwait(false) + : await _pipeOutput.FlushAsync(cancellationToken).ConfigureAwait(false); } catch (ObjectDisposedException) { @@ -202,12 +216,14 @@ private async ValueTask SendHeaderCore(MessageType type, bool force, Cancellatio _pipeOutput.Advance(1); Logger?.LogTrace($"Sending {type} header."); + FlushResult result = default; try { - // If the cancellation token is canceled after the flush has completed, QUIC throws sometimes. - // https://github.com/dotnet/runtime/issues/82704 - result = await _pipeOutput.FlushAsync().ConfigureAwait(false); + // ReSharper disable once MethodSupportsCancellation + result = _configDoNotPassFlushCancellationToken + ? await _pipeOutput.FlushAsync().ConfigureAwait(false) + : await _pipeOutput.FlushAsync(cancellationToken).ConfigureAwait(false); } catch (ObjectDisposedException) { diff --git a/src/NexNet/Internals/NexusSession.cs b/src/NexNet/Internals/NexusSession.cs index 6326f82b..8902b3d8 100644 --- a/src/NexNet/Internals/NexusSession.cs +++ b/src/NexNet/Internals/NexusSession.cs @@ -48,7 +48,15 @@ private record struct ProcessResult( private readonly TaskCompletionSource? _readyTaskCompletionSource; private readonly TaskCompletionSource? _disconnectedTaskCompletionSource; - private volatile int _state; + private volatile int _state; + + /// + /// If true, the transport will not pass the flush cancellation token to the underlying transport. + /// Currently exists due to an issue in the QUIC implementation. Should be removed once the issue is fixed. + /// https://github.com/dotnet/runtime/issues/82704 + /// https://github.com/dotnet/runtime/pull/90253 + /// + private readonly bool _configDoNotPassFlushCancellationToken; public NexusPipeManager PipeManager { get; } @@ -87,6 +95,7 @@ public NexusSession(in NexusSessionConfigurations configurations _pipeInput = configurations.Transport.Input; _pipeOutput = configurations.Transport.Output; _transportConnection = configurations.Transport; + _configDoNotPassFlushCancellationToken = _transportConnection.Configurations.DoNotPassFlushCancellationToken; Config = _config = configurations.Configs; _readyTaskCompletionSource = configurations.ReadyTaskCompletionSource; _disconnectedTaskCompletionSource = configurations.DisconnectedTaskCompletionSource; diff --git a/src/NexNet/Transports/ITransport.cs b/src/NexNet/Transports/ITransport.cs index 95370472..6a821a9c 100644 --- a/src/NexNet/Transports/ITransport.cs +++ b/src/NexNet/Transports/ITransport.cs @@ -8,6 +8,11 @@ namespace NexNet.Transports; /// public interface ITransport : IDuplexPipe { + /// + /// The configuration used to create this transport. + /// + public TransportConfiguration Configurations { get; } + /// /// Closes the connection. /// diff --git a/src/NexNet/Transports/QuicTransport.cs b/src/NexNet/Transports/QuicTransport.cs index 41f9331f..81bc6cc6 100644 --- a/src/NexNet/Transports/QuicTransport.cs +++ b/src/NexNet/Transports/QuicTransport.cs @@ -14,9 +14,10 @@ namespace NexNet.Transports; internal class QuicTransport : ITransport { + private readonly QuicConnection _quicConnection; - private readonly QuicStream _quicStream; + private readonly QuicStream _quicStream; public PipeReader Input { get; } public PipeWriter Output { get; } @@ -28,6 +29,12 @@ private QuicTransport(QuicConnection quicConnection, QuicStream stream) Input = PipeReader.Create(stream); Output = PipeWriter.Create(stream); } + + public TransportConfiguration Configurations => new TransportConfiguration() + { + DoNotPassFlushCancellationToken = true + }; + public ValueTask CloseAsync(bool linger) { return _quicConnection.CloseAsync(0); diff --git a/src/NexNet/Transports/SocketTransport.cs b/src/NexNet/Transports/SocketTransport.cs index 8ede24b5..5631f2cd 100644 --- a/src/NexNet/Transports/SocketTransport.cs +++ b/src/NexNet/Transports/SocketTransport.cs @@ -26,6 +26,8 @@ private SocketTransport(SocketConnection socketConnection) Output = socketConnection.Output; } + public TransportConfiguration Configurations => new TransportConfiguration(); + public ValueTask CloseAsync(bool linger) { if (!linger) diff --git a/src/NexNet/Transports/TcpTlsTransport.cs b/src/NexNet/Transports/TcpTlsTransport.cs index 320bf12d..2009f613 100644 --- a/src/NexNet/Transports/TcpTlsTransport.cs +++ b/src/NexNet/Transports/TcpTlsTransport.cs @@ -26,6 +26,8 @@ private TcpTlsTransport(Socket socket, NetworkStream networkStream, SslStream ss Output = PipeWriter.Create(sslStream); } + public TransportConfiguration Configurations => new TransportConfiguration(); + public ValueTask CloseAsync(bool linger) { if (!linger) diff --git a/src/NexNet/Transports/TransportConfiguration.cs b/src/NexNet/Transports/TransportConfiguration.cs new file mode 100644 index 00000000..c4c1854f --- /dev/null +++ b/src/NexNet/Transports/TransportConfiguration.cs @@ -0,0 +1,17 @@ +namespace NexNet.Transports; + +/// +/// Contains configuration options for a transport. +/// +public class TransportConfiguration +{ + /// + /// If true, the transport will not pass the flush cancellation token to the underlying transport. + /// Currently exists due to an issue in the QUIC implementation. Should be removed once the issue is fixed. + /// + /// + /// https://github.com/dotnet/runtime/issues/82704 + /// https://github.com/dotnet/runtime/pull/90253 + /// + public bool DoNotPassFlushCancellationToken { get; init; } = false; +}