Skip to content

Commit

Permalink
Improved disposing and fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
GerardSmit authored and lukebakken committed Dec 22, 2023
1 parent 75b8424 commit 70e9376
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 49 deletions.
30 changes: 16 additions & 14 deletions projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

using System;
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
{
internal class RentedOutgoingMemory : IDisposable
internal sealed class RentedOutgoingMemory : IDisposable
{
private readonly TaskCompletionSource<bool>? _sendCompletionSource;
private bool _disposedValue;
private byte[]? _rentedArray;
private TaskCompletionSource<bool>? _sendCompletionSource;
private ReadOnlySequence<byte> _data;

public RentedOutgoingMemory(ReadOnlyMemory<byte> data, byte[]? rentedArray = null, bool waitSend = false)
Expand Down Expand Up @@ -50,30 +52,30 @@ internal ReadOnlySequence<byte> Data
/// <summary>
/// Mark the data as sent.
/// </summary>
public void DidSend()
/// <returns><c>true</c> if the object can be disposed, <c>false</c> if the <see cref="SocketFrameHandler"/> is waiting for the data to be sent.</returns>
public bool DidSend()
{
if (_sendCompletionSource is null)
{
Dispose();
}
else
{
_sendCompletionSource.SetResult(true);
return true;
}

_sendCompletionSource.SetResult(true);
return false;
}

/// <summary>
/// Wait for the data to be sent.
/// </summary>
/// <returns>A <see cref="ValueTask"/> that completes when the data is sent.</returns>
public ValueTask WaitForDataSendAsync()
/// <returns><c>true</c> if the data was sent and the object can be disposed.</returns>
public ValueTask<bool> WaitForDataSendAsync()
{
return _sendCompletionSource is null ? default : WaitForFinishCore();
return _sendCompletionSource is null ? new ValueTask<bool>(false) : WaitForFinishCore();

async ValueTask WaitForFinishCore()
async ValueTask<bool> WaitForFinishCore()
{
await _sendCompletionSource.Task.ConfigureAwait(false);
Dispose();
return true;
}
}

Expand All @@ -92,6 +94,7 @@ private void Dispose(bool disposing)
return;
}

Debug.Assert(_sendCompletionSource is null or { Task.IsCompleted: true }, "The send task should be completed before disposing.");
_disposedValue = true;

if (disposing)
Expand All @@ -109,7 +112,6 @@ private void Dispose(bool disposing)
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
}
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/impl/SessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader head
ThrowAlreadyClosedException();
}

copyBody ??= body.Length > Connection.CopyBodyToMemoryThreshold;
copyBody ??= body.Length <= Connection.CopyBodyToMemoryThreshold;

return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize, copyBody.Value));
}
Expand Down
12 changes: 10 additions & 2 deletions projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,13 @@ public async ValueTask WriteAsync(RentedOutgoingMemory frames)
await _channelWriter.WriteAsync(frames)
.ConfigureAwait(false);

await frames.WaitForDataSendAsync()
bool didSend = await frames.WaitForDataSendAsync()
.ConfigureAwait(false);

if (didSend)
{
frames.Dispose();
}
}
}

Expand Down Expand Up @@ -346,7 +351,10 @@ await _pipeWriter.FlushAsync()
}
finally
{
frames.DidSend();
if (frames.DidSend())
{
frames.Dispose();
}
}
}

Expand Down
12 changes: 6 additions & 6 deletions projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public async Task TestNonCopyingBody(ushort size)

Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q));

// It is expected that the rented bytes is smaller than the size of the body
// since we're not copying the body. Only the frame headers are rented.
Assert.True(rentedBytes < size);
// It is expected that the rented bytes is larger than the size of the body
// since the body is copied with the frame headers.
Assert.True(rentedBytes >= size);
}

[Theory]
Expand All @@ -59,8 +59,8 @@ public async Task TestCopyingBody(ushort size)

Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q));

// It is expected that the rented bytes is larger than the size of the body
// since the body is copied with the frame headers.
Assert.True(rentedBytes >= size);
// It is expected that the rented bytes is smaller than the size of the body
// since we're not copying the body. Only the frame headers are rented.
Assert.True(rentedBytes < size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,37 +55,28 @@ public override async Task InitializeAsync()
_conn.ConnectionShutdown += HandleConnectionShutdown;
}

[Fact]
public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync()
[Theory]
[InlineData(false)]
[InlineData(true)]
public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync(bool copyBody)
{
return TestConcurrentChannelOpenAndPublishingWithBodyAsync(Array.Empty<byte>(), 30);
return TestConcurrentChannelOpenAndPublishingWithBodyAsync(Array.Empty<byte>(), copyBody, 30);
}

[Fact]
public Task TestConcurrentChannelOpenAndPublishingSize64Async()
{
return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(64);
}

[Fact]
public Task TestConcurrentChannelOpenAndPublishingSize256Async()
{
return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(256);
}

[Fact]
public Task TestConcurrentChannelOpenAndPublishingSize1024Async()
{
return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(1024);
}

private Task TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(ushort length, int iterations = 30)
[Theory]
[InlineData(64, false)]
[InlineData(64, true)]
[InlineData(256, false)]
[InlineData(256, true)]
[InlineData(1024, false)]
[InlineData(1024, true)]
public Task TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(ushort length, bool copyBody, int iterations = 30)
{
byte[] body = GetRandomBody(length);
return TestConcurrentChannelOpenAndPublishingWithBodyAsync(body, iterations);
return TestConcurrentChannelOpenAndPublishingWithBodyAsync(body, copyBody, iterations);
}

private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, int iterations)
private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, bool copyBody, int iterations)
{
return TestConcurrentChannelOperationsAsync(async (conn) =>
{
Expand Down Expand Up @@ -128,7 +119,7 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in
QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null);
for (ushort j = 0; j < _messageCount; j++)
{
await ch.BasicPublishAsync("", q.QueueName, body, mandatory: true);
await ch.BasicPublishAsync("", q.QueueName, body, mandatory: true, copyBody: copyBody);
}

Assert.True(await tcs.Task);
Expand Down
7 changes: 6 additions & 1 deletion projects/Test/Unit/TestRentedOutgoingMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ public async Task TestNonBlocking()
var waitTask = rentedMemory.WaitForDataSendAsync().AsTask();
var timeoutTask = Task.Delay(100);
var completedTask = await Task.WhenAny(timeoutTask, waitTask);
bool didSend = rentedMemory.DidSend();

// Assert
Assert.Equal(waitTask, completedTask);
Assert.False(waitTask.Result);
Assert.True(didSend);
}

[Fact]
Expand Down Expand Up @@ -49,11 +52,13 @@ public async Task TestBlockingCompleted()
var waitTask = rentedMemory.WaitForDataSendAsync().AsTask();
var timeoutTask = Task.Delay(100);

rentedMemory.DidSend();
bool didSend = rentedMemory.DidSend();

var completedTask = await Task.WhenAny(timeoutTask, waitTask);

// Assert
Assert.Equal(waitTask, completedTask);
Assert.True(waitTask.Result);
Assert.False(didSend);
}
}

0 comments on commit 70e9376

Please sign in to comment.