From f9573de5238443108e56ef0c430820150435d8b8 Mon Sep 17 00:00:00 2001 From: sven-n Date: Tue, 27 Aug 2024 21:41:10 +0200 Subject: [PATCH] Consider result of PipeWriter.FlushAsync before flushing the next time This should prevent possible deadlocks --- src/Network/Connection.cs | 5 ++- src/Network/ExtendedPipeWriter.cs | 2 +- src/Network/PacketPipeReaderBase.cs | 38 +++++++++++++++++-- .../PipelinedSimpleModulusDecryptor.cs | 8 ++-- .../PipelinedSimpleModulusEncryptor.cs | 7 ++-- src/Network/Xor/PipelinedXor32Decryptor.cs | 6 +-- src/Network/Xor/PipelinedXor32Encryptor.cs | 6 +-- 7 files changed, 51 insertions(+), 21 deletions(-) diff --git a/src/Network/Connection.cs b/src/Network/Connection.cs index bef0b9df1..e258b1371 100644 --- a/src/Network/Connection.cs +++ b/src/Network/Connection.cs @@ -193,8 +193,8 @@ protected override async ValueTask OnCompleteAsync(Exception? exception) /// Reads the mu online packet by raising . /// /// The mu online packet. - /// The async task. - protected override async ValueTask ReadPacketAsync(ReadOnlySequence packet) + /// , if the flush was successful or not required., if the pipe reader is completed and no longer reading data. + protected override async ValueTask ReadPacketAsync(ReadOnlySequence packet) { IncomingBytesCounter.Add(packet.Length); @@ -205,6 +205,7 @@ protected override async ValueTask ReadPacketAsync(ReadOnlySequence packet try { await this.PacketReceived.SafeInvokeAsync(packet).ConfigureAwait(false); + return true; } finally { diff --git a/src/Network/ExtendedPipeWriter.cs b/src/Network/ExtendedPipeWriter.cs index 797ecde42..850daa6ae 100644 --- a/src/Network/ExtendedPipeWriter.cs +++ b/src/Network/ExtendedPipeWriter.cs @@ -9,7 +9,7 @@ namespace MUnique.OpenMU.Network; using System.Threading; /// -/// A wrapper for an existing , which flushes the advanced bytes automatically. +/// A wrapper for an existing , which has metrics about the written bytes. /// public class ExtendedPipeWriter : PipeWriter { diff --git a/src/Network/PacketPipeReaderBase.cs b/src/Network/PacketPipeReaderBase.cs index 652cf2185..7ec7953dc 100644 --- a/src/Network/PacketPipeReaderBase.cs +++ b/src/Network/PacketPipeReaderBase.cs @@ -11,6 +11,11 @@ namespace MUnique.OpenMU.Network; /// /// Base class for all classes which read mu online data packets from a . /// +/// +/// Things to consider here: +/// * Do not call FlushAsync if the reader can't start until FlushAsync finishes, as that may cause a deadlock. +/// * Ensure that only one context "owns" a PipeReader or PipeWriter or accesses them. These types are not thread-safe. +/// public abstract class PacketPipeReaderBase { private readonly byte[] _headerBuffer = new byte[3]; @@ -24,8 +29,8 @@ public abstract class PacketPipeReaderBase /// Reads the mu online packet. /// /// The mu online packet. - /// The async task. - protected abstract ValueTask ReadPacketAsync(ReadOnlySequence packet); + /// , if the flush was successful or not required., if the pipe reader is completed and no longer reading data. + protected abstract ValueTask ReadPacketAsync(ReadOnlySequence packet); /// /// Called when the completed. @@ -70,11 +75,32 @@ protected async Task ReadSourceAsync() await this.OnCompleteAsync(null).ConfigureAwait(false); } + /// + /// Tries to flush the writer. + /// + /// The pipe writer. + /// + /// , if the flush was successful or not required., if the pipe reader is completed and no longer reading data. + /// + protected async ValueTask TryFlushWriterAsync(PipeWriter pipeWriter) + { + if (pipeWriter is { CanGetUnflushedBytes: true, UnflushedBytes: 0 }) + { + // It was flushed already in the background. + return true; + } + + // todo: what happens if it was flushed in the background in the meantime? race-condition? + var flushResult = await pipeWriter.FlushAsync().ConfigureAwait(false); + return !flushResult.IsCompleted; + } + private async Task ReadBufferAsync() { ReadResult result = await this.Source.ReadAsync().ConfigureAwait(false); ReadOnlySequence buffer = result.Buffer; int? length = null; + var readingCancelledOrCompleted = false; do { if (buffer.Length > 2) @@ -97,7 +123,11 @@ private async Task ReadBufferAsync() if (length is > 0 && buffer.Length >= length) { var packet = buffer.Slice(0, length.Value); - await this.ReadPacketAsync(packet).ConfigureAwait(false); + if (!await this.ReadPacketAsync(packet).ConfigureAwait(false)) + { + readingCancelledOrCompleted = true; + break; + } buffer = buffer.Slice(buffer.GetPosition(length.Value), buffer.End); length = null; @@ -121,6 +151,6 @@ private async Task ReadBufferAsync() this.Source.AdvanceTo(buffer.Start); } - return result.IsCompleted; + return result.IsCompleted || readingCancelledOrCompleted; } } \ No newline at end of file diff --git a/src/Network/SimpleModulus/PipelinedSimpleModulusDecryptor.cs b/src/Network/SimpleModulus/PipelinedSimpleModulusDecryptor.cs index fc85521c5..5f038a9fd 100644 --- a/src/Network/SimpleModulus/PipelinedSimpleModulusDecryptor.cs +++ b/src/Network/SimpleModulus/PipelinedSimpleModulusDecryptor.cs @@ -80,8 +80,8 @@ protected override ValueTask OnCompleteAsync(Exception? exception) /// Decrypts the packet and writes it into our pipe. /// /// The mu online packet. - /// The async task. - protected override async ValueTask ReadPacketAsync(ReadOnlySequence packet) + /// , if the flush was successful or not required., if the pipe reader is completed and no longer reading data. + protected override async ValueTask ReadPacketAsync(ReadOnlySequence packet) { // The next line is getting a span from the writer which is at least as big as the packet. // As I found out, it's initially about 2 kb in size and gets smaller within further @@ -92,7 +92,7 @@ protected override async ValueTask ReadPacketAsync(ReadOnlySequence packet { // we just have to write-through this.CopyDataIntoWriter(this.Pipe.Writer, packet); - await this.Pipe.Writer.FlushAsync().ConfigureAwait(false); + return await this.TryFlushWriterAsync(this.Pipe.Writer).ConfigureAwait(false); } else { @@ -105,7 +105,7 @@ protected override async ValueTask ReadPacketAsync(ReadOnlySequence packet } this.DecryptAndWrite(packet); - await this.Pipe.Writer.FlushAsync().ConfigureAwait(false); + return await this.TryFlushWriterAsync(this.Pipe.Writer).ConfigureAwait(false); } } diff --git a/src/Network/SimpleModulus/PipelinedSimpleModulusEncryptor.cs b/src/Network/SimpleModulus/PipelinedSimpleModulusEncryptor.cs index 147a1370f..66b741453 100644 --- a/src/Network/SimpleModulus/PipelinedSimpleModulusEncryptor.cs +++ b/src/Network/SimpleModulus/PipelinedSimpleModulusEncryptor.cs @@ -73,7 +73,7 @@ protected override ValueTask OnCompleteAsync(Exception? exception) } /// - protected override async ValueTask ReadPacketAsync(ReadOnlySequence packet) + protected override async ValueTask ReadPacketAsync(ReadOnlySequence packet) { packet.Slice(0, this.HeaderBuffer.Length).CopyTo(this.HeaderBuffer); @@ -81,12 +81,11 @@ protected override async ValueTask ReadPacketAsync(ReadOnlySequence packet { // we just have to write-through this.CopyDataIntoWriter(this._target, packet); - await this._target.FlushAsync().ConfigureAwait(false); - return; + return await this.TryFlushWriterAsync(this._target).ConfigureAwait(false); } this.EncryptAndWrite(packet); - await this._target.FlushAsync().ConfigureAwait(false); + return await this.TryFlushWriterAsync(this._target).ConfigureAwait(false); } /// diff --git a/src/Network/Xor/PipelinedXor32Decryptor.cs b/src/Network/Xor/PipelinedXor32Decryptor.cs index 586565355..4e5604742 100644 --- a/src/Network/Xor/PipelinedXor32Decryptor.cs +++ b/src/Network/Xor/PipelinedXor32Decryptor.cs @@ -56,11 +56,11 @@ protected override ValueTask OnCompleteAsync(Exception? exception) /// Decrypts the packet and writes it into our pipe. /// /// The mu online packet. - /// The async task. - protected override async ValueTask ReadPacketAsync(ReadOnlySequence packet) + /// , if the flush was successful or not required., if the pipe reader is completed and no longer reading data. + protected override async ValueTask ReadPacketAsync(ReadOnlySequence packet) { this.DecryptAndWrite(packet); - await this._pipe.Writer.FlushAsync().ConfigureAwait(false); + return await this.TryFlushWriterAsync(this._pipe.Writer).ConfigureAwait(false); } private void DecryptAndWrite(ReadOnlySequence packet) diff --git a/src/Network/Xor/PipelinedXor32Encryptor.cs b/src/Network/Xor/PipelinedXor32Encryptor.cs index d14b6084e..6cef83a2e 100644 --- a/src/Network/Xor/PipelinedXor32Encryptor.cs +++ b/src/Network/Xor/PipelinedXor32Encryptor.cs @@ -58,11 +58,11 @@ protected override ValueTask OnCompleteAsync(Exception? exception) /// Encrypts the packet and writes it into the target. /// /// The mu online packet. - /// The async task. - protected override async ValueTask ReadPacketAsync(ReadOnlySequence packet) + /// , if the flush was successful or not required., if the pipe reader is completed and no longer reading data. + protected override async ValueTask ReadPacketAsync(ReadOnlySequence packet) { this.EncryptAndWrite(packet); - await this._target.FlushAsync().ConfigureAwait(false); + return await this.TryFlushWriterAsync(this._target).ConfigureAwait(false); } private void EncryptAndWrite(ReadOnlySequence packet)