Skip to content

Commit

Permalink
Merge pull request #475 from MUnique/dev/network-improvements
Browse files Browse the repository at this point in the history
Improvements for the network code
  • Loading branch information
sven-n authored Aug 27, 2024
2 parents b4d7948 + f9573de commit 923e537
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 21 deletions.
5 changes: 3 additions & 2 deletions src/Network/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ protected override async ValueTask OnCompleteAsync(Exception? exception)
/// Reads the mu online packet by raising <see cref="PacketReceived" />.
/// </summary>
/// <param name="packet">The mu online packet.</param>
/// <returns>The async task.</returns>
protected override async ValueTask ReadPacketAsync(ReadOnlySequence<byte> packet)
/// <returns><see langword="true" />, if the flush was successful or not required.<see langword="false" />, if the pipe reader is completed and no longer reading data.</returns>
protected override async ValueTask<bool> ReadPacketAsync(ReadOnlySequence<byte> packet)
{
IncomingBytesCounter.Add(packet.Length);

Expand All @@ -205,6 +205,7 @@ protected override async ValueTask ReadPacketAsync(ReadOnlySequence<byte> packet
try
{
await this.PacketReceived.SafeInvokeAsync(packet).ConfigureAwait(false);
return true;
}
finally
{
Expand Down
2 changes: 1 addition & 1 deletion src/Network/ExtendedPipeWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace MUnique.OpenMU.Network;
using System.Threading;

/// <summary>
/// A wrapper for an existing <see cref="PipeWriter"/>, which flushes the advanced bytes automatically.
/// A wrapper for an existing <see cref="PipeWriter"/>, which has metrics about the written bytes.
/// </summary>
public class ExtendedPipeWriter : PipeWriter
{
Expand Down
38 changes: 34 additions & 4 deletions src/Network/PacketPipeReaderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ namespace MUnique.OpenMU.Network;
/// <summary>
/// Base class for all classes which read mu online data packets from a <see cref="PipeReader"/>.
/// </summary>
/// <remarks>
/// Things to consider here:
/// * Do not call FlushAsync if the reader can't start until FlushAsync finishes, as that may cause a deadlock.
/// * Ensure that only one context "owns" a PipeReader or PipeWriter or accesses them. These types are not thread-safe.
/// </remarks>
public abstract class PacketPipeReaderBase
{
private readonly byte[] _headerBuffer = new byte[3];
Expand All @@ -24,8 +29,8 @@ public abstract class PacketPipeReaderBase
/// Reads the mu online packet.
/// </summary>
/// <param name="packet">The mu online packet.</param>
/// <returns>The async task.</returns>
protected abstract ValueTask ReadPacketAsync(ReadOnlySequence<byte> packet);
/// <returns><see langword="true" />, if the flush was successful or not required.<see langword="false" />, if the pipe reader is completed and no longer reading data.</returns>
protected abstract ValueTask<bool> ReadPacketAsync(ReadOnlySequence<byte> packet);

/// <summary>
/// Called when the <see cref="Source"/> completed.
Expand Down Expand Up @@ -70,11 +75,32 @@ protected async Task ReadSourceAsync()
await this.OnCompleteAsync(null).ConfigureAwait(false);
}

/// <summary>
/// Tries to flush the writer.
/// </summary>
/// <param name="pipeWriter">The pipe writer.</param>
/// <returns>
/// <see langword="true" />, if the flush was successful or not required.<see langword="false" />, if the pipe reader is completed and no longer reading data.
/// </returns>
protected async ValueTask<bool> TryFlushWriterAsync(PipeWriter pipeWriter)
{
if (pipeWriter is { CanGetUnflushedBytes: true, UnflushedBytes: 0 })
{
// It was flushed already in the background.
return true;
}

// todo: what happens if it was flushed in the background in the meantime? race-condition?
var flushResult = await pipeWriter.FlushAsync().ConfigureAwait(false);
return !flushResult.IsCompleted;
}

private async Task<bool> ReadBufferAsync()
{
ReadResult result = await this.Source.ReadAsync().ConfigureAwait(false);
ReadOnlySequence<byte> buffer = result.Buffer;
int? length = null;
var readingCancelledOrCompleted = false;
do
{
if (buffer.Length > 2)
Expand All @@ -97,7 +123,11 @@ private async Task<bool> ReadBufferAsync()
if (length is > 0 && buffer.Length >= length)
{
var packet = buffer.Slice(0, length.Value);
await this.ReadPacketAsync(packet).ConfigureAwait(false);
if (!await this.ReadPacketAsync(packet).ConfigureAwait(false))
{
readingCancelledOrCompleted = true;
break;
}

buffer = buffer.Slice(buffer.GetPosition(length.Value), buffer.End);
length = null;
Expand All @@ -121,6 +151,6 @@ private async Task<bool> ReadBufferAsync()
this.Source.AdvanceTo(buffer.Start);
}

return result.IsCompleted;
return result.IsCompleted || readingCancelledOrCompleted;
}
}
8 changes: 4 additions & 4 deletions src/Network/SimpleModulus/PipelinedSimpleModulusDecryptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ protected override ValueTask OnCompleteAsync(Exception? exception)
/// Decrypts the packet and writes it into our pipe.
/// </summary>
/// <param name="packet">The mu online packet.</param>
/// <returns>The async task.</returns>
protected override async ValueTask ReadPacketAsync(ReadOnlySequence<byte> packet)
/// <returns><see langword="true" />, if the flush was successful or not required.<see langword="false" />, if the pipe reader is completed and no longer reading data.</returns>
protected override async ValueTask<bool> ReadPacketAsync(ReadOnlySequence<byte> packet)
{
// The next line is getting a span from the writer which is at least as big as the packet.
// As I found out, it's initially about 2 kb in size and gets smaller within further
Expand All @@ -92,7 +92,7 @@ protected override async ValueTask ReadPacketAsync(ReadOnlySequence<byte> packet
{
// we just have to write-through
this.CopyDataIntoWriter(this.Pipe.Writer, packet);
await this.Pipe.Writer.FlushAsync().ConfigureAwait(false);
return await this.TryFlushWriterAsync(this.Pipe.Writer).ConfigureAwait(false);
}
else
{
Expand All @@ -105,7 +105,7 @@ protected override async ValueTask ReadPacketAsync(ReadOnlySequence<byte> packet
}

this.DecryptAndWrite(packet);
await this.Pipe.Writer.FlushAsync().ConfigureAwait(false);
return await this.TryFlushWriterAsync(this.Pipe.Writer).ConfigureAwait(false);
}
}

Expand Down
7 changes: 3 additions & 4 deletions src/Network/SimpleModulus/PipelinedSimpleModulusEncryptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,19 @@ protected override ValueTask OnCompleteAsync(Exception? exception)
}

/// <inheritdoc />
protected override async ValueTask ReadPacketAsync(ReadOnlySequence<byte> packet)
protected override async ValueTask<bool> ReadPacketAsync(ReadOnlySequence<byte> packet)
{
packet.Slice(0, this.HeaderBuffer.Length).CopyTo(this.HeaderBuffer);

if (this.HeaderBuffer[0] < 0xC3)
{
// we just have to write-through
this.CopyDataIntoWriter(this._target, packet);
await this._target.FlushAsync().ConfigureAwait(false);
return;
return await this.TryFlushWriterAsync(this._target).ConfigureAwait(false);
}

this.EncryptAndWrite(packet);
await this._target.FlushAsync().ConfigureAwait(false);
return await this.TryFlushWriterAsync(this._target).ConfigureAwait(false);
}

/// <summary>
Expand Down
6 changes: 3 additions & 3 deletions src/Network/Xor/PipelinedXor32Decryptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ protected override ValueTask OnCompleteAsync(Exception? exception)
/// Decrypts the packet and writes it into our pipe.
/// </summary>
/// <param name="packet">The mu online packet.</param>
/// <returns>The async task.</returns>
protected override async ValueTask ReadPacketAsync(ReadOnlySequence<byte> packet)
/// <returns><see langword="true" />, if the flush was successful or not required.<see langword="false" />, if the pipe reader is completed and no longer reading data.</returns>
protected override async ValueTask<bool> ReadPacketAsync(ReadOnlySequence<byte> packet)
{
this.DecryptAndWrite(packet);
await this._pipe.Writer.FlushAsync().ConfigureAwait(false);
return await this.TryFlushWriterAsync(this._pipe.Writer).ConfigureAwait(false);
}

private void DecryptAndWrite(ReadOnlySequence<byte> packet)
Expand Down
6 changes: 3 additions & 3 deletions src/Network/Xor/PipelinedXor32Encryptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ protected override ValueTask OnCompleteAsync(Exception? exception)
/// Encrypts the packet and writes it into the target.
/// </summary>
/// <param name="packet">The mu online packet.</param>
/// <returns>The async task.</returns>
protected override async ValueTask ReadPacketAsync(ReadOnlySequence<byte> packet)
/// <returns><see langword="true" />, if the flush was successful or not required.<see langword="false" />, if the pipe reader is completed and no longer reading data.</returns>
protected override async ValueTask<bool> ReadPacketAsync(ReadOnlySequence<byte> packet)
{
this.EncryptAndWrite(packet);
await this._target.FlushAsync().ConfigureAwait(false);
return await this.TryFlushWriterAsync(this._target).ConfigureAwait(false);
}

private void EncryptAndWrite(ReadOnlySequence<byte> packet)
Expand Down

0 comments on commit 923e537

Please sign in to comment.