From 70e93763d3fb9f93452caacf3699e40372df5599 Mon Sep 17 00:00:00 2001 From: Gerard Smit Date: Fri, 15 Dec 2023 20:06:18 +0100 Subject: [PATCH] Improved disposing and fixed tests --- .../client/RentedOutgoingMemory.cs | 30 +++++++------- .../client/impl/SessionBase.cs | 2 +- .../client/impl/SocketFrameHandler.cs | 12 +++++- .../TestBasicPublishCopyBodyAsync.cs | 12 +++--- ...ncurrentAccessWithSharedConnectionAsync.cs | 41 ++++++++----------- .../Test/Unit/TestRentedOutgoingMemory.cs | 7 +++- 6 files changed, 55 insertions(+), 49 deletions(-) diff --git a/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs index 396514302e..6469806ddb 100644 --- a/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs +++ b/projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs @@ -2,16 +2,18 @@ using System; using System.Buffers; +using System.Diagnostics; using System.IO.Pipelines; using System.Threading.Tasks; +using RabbitMQ.Client.Impl; namespace RabbitMQ.Client { - internal class RentedOutgoingMemory : IDisposable + internal sealed class RentedOutgoingMemory : IDisposable { + private readonly TaskCompletionSource? _sendCompletionSource; private bool _disposedValue; private byte[]? _rentedArray; - private TaskCompletionSource? _sendCompletionSource; private ReadOnlySequence _data; public RentedOutgoingMemory(ReadOnlyMemory data, byte[]? rentedArray = null, bool waitSend = false) @@ -50,30 +52,30 @@ internal ReadOnlySequence Data /// /// Mark the data as sent. /// - public void DidSend() + /// true if the object can be disposed, false if the is waiting for the data to be sent. + public bool DidSend() { if (_sendCompletionSource is null) { - Dispose(); - } - else - { - _sendCompletionSource.SetResult(true); + return true; } + + _sendCompletionSource.SetResult(true); + return false; } /// /// Wait for the data to be sent. /// - /// A that completes when the data is sent. - public ValueTask WaitForDataSendAsync() + /// true if the data was sent and the object can be disposed. + public ValueTask WaitForDataSendAsync() { - return _sendCompletionSource is null ? default : WaitForFinishCore(); + return _sendCompletionSource is null ? new ValueTask(false) : WaitForFinishCore(); - async ValueTask WaitForFinishCore() + async ValueTask WaitForFinishCore() { await _sendCompletionSource.Task.ConfigureAwait(false); - Dispose(); + return true; } } @@ -92,6 +94,7 @@ private void Dispose(bool disposing) return; } + Debug.Assert(_sendCompletionSource is null or { Task.IsCompleted: true }, "The send task should be completed before disposing."); _disposedValue = true; if (disposing) @@ -109,7 +112,6 @@ private void Dispose(bool disposing) public void Dispose() { Dispose(disposing: true); - GC.SuppressFinalize(this); } } } diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index 610334256f..fe078ff338 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -180,7 +180,7 @@ public ValueTask TransmitAsync(in TMethod cmd, in THeader head ThrowAlreadyClosedException(); } - copyBody ??= body.Length > Connection.CopyBodyToMemoryThreshold; + copyBody ??= body.Length <= Connection.CopyBodyToMemoryThreshold; return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize, copyBody.Value)); } diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs index a2a9f42e36..b319533390 100644 --- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs @@ -308,8 +308,13 @@ public async ValueTask WriteAsync(RentedOutgoingMemory frames) await _channelWriter.WriteAsync(frames) .ConfigureAwait(false); - await frames.WaitForDataSendAsync() + bool didSend = await frames.WaitForDataSendAsync() .ConfigureAwait(false); + + if (didSend) + { + frames.Dispose(); + } } } @@ -346,7 +351,10 @@ await _pipeWriter.FlushAsync() } finally { - frames.DidSend(); + if (frames.DidSend()) + { + frames.Dispose(); + } } } diff --git a/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs b/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs index d2ba144007..65e8e39000 100644 --- a/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs +++ b/projects/Test/AsyncIntegration/TestBasicPublishCopyBodyAsync.cs @@ -36,9 +36,9 @@ public async Task TestNonCopyingBody(ushort size) Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q)); - // It is expected that the rented bytes is smaller than the size of the body - // since we're not copying the body. Only the frame headers are rented. - Assert.True(rentedBytes < size); + // It is expected that the rented bytes is larger than the size of the body + // since the body is copied with the frame headers. + Assert.True(rentedBytes >= size); } [Theory] @@ -59,8 +59,8 @@ public async Task TestCopyingBody(ushort size) Assert.Equal((uint)1, await _channel.QueuePurgeAsync(q)); - // It is expected that the rented bytes is larger than the size of the body - // since the body is copied with the frame headers. - Assert.True(rentedBytes >= size); + // It is expected that the rented bytes is smaller than the size of the body + // since we're not copying the body. Only the frame headers are rented. + Assert.True(rentedBytes < size); } } diff --git a/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs b/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs index 9626bf4fa8..4e431ecbdc 100644 --- a/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs +++ b/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs @@ -55,37 +55,28 @@ public override async Task InitializeAsync() _conn.ConnectionShutdown += HandleConnectionShutdown; } - [Fact] - public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync() + [Theory] + [InlineData(false)] + [InlineData(true)] + public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync(bool copyBody) { - return TestConcurrentChannelOpenAndPublishingWithBodyAsync(Array.Empty(), 30); + return TestConcurrentChannelOpenAndPublishingWithBodyAsync(Array.Empty(), copyBody, 30); } - [Fact] - public Task TestConcurrentChannelOpenAndPublishingSize64Async() - { - return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(64); - } - - [Fact] - public Task TestConcurrentChannelOpenAndPublishingSize256Async() - { - return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(256); - } - - [Fact] - public Task TestConcurrentChannelOpenAndPublishingSize1024Async() - { - return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(1024); - } - - private Task TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(ushort length, int iterations = 30) + [Theory] + [InlineData(64, false)] + [InlineData(64, true)] + [InlineData(256, false)] + [InlineData(256, true)] + [InlineData(1024, false)] + [InlineData(1024, true)] + public Task TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(ushort length, bool copyBody, int iterations = 30) { byte[] body = GetRandomBody(length); - return TestConcurrentChannelOpenAndPublishingWithBodyAsync(body, iterations); + return TestConcurrentChannelOpenAndPublishingWithBodyAsync(body, copyBody, iterations); } - private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, int iterations) + private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, bool copyBody, int iterations) { return TestConcurrentChannelOperationsAsync(async (conn) => { @@ -128,7 +119,7 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null); for (ushort j = 0; j < _messageCount; j++) { - await ch.BasicPublishAsync("", q.QueueName, body, mandatory: true); + await ch.BasicPublishAsync("", q.QueueName, body, mandatory: true, copyBody: copyBody); } Assert.True(await tcs.Task); diff --git a/projects/Test/Unit/TestRentedOutgoingMemory.cs b/projects/Test/Unit/TestRentedOutgoingMemory.cs index 3ee2b46ac4..b45adf217b 100644 --- a/projects/Test/Unit/TestRentedOutgoingMemory.cs +++ b/projects/Test/Unit/TestRentedOutgoingMemory.cs @@ -17,9 +17,12 @@ public async Task TestNonBlocking() var waitTask = rentedMemory.WaitForDataSendAsync().AsTask(); var timeoutTask = Task.Delay(100); var completedTask = await Task.WhenAny(timeoutTask, waitTask); + bool didSend = rentedMemory.DidSend(); // Assert Assert.Equal(waitTask, completedTask); + Assert.False(waitTask.Result); + Assert.True(didSend); } [Fact] @@ -49,11 +52,13 @@ public async Task TestBlockingCompleted() var waitTask = rentedMemory.WaitForDataSendAsync().AsTask(); var timeoutTask = Task.Delay(100); - rentedMemory.DidSend(); + bool didSend = rentedMemory.DidSend(); var completedTask = await Task.WhenAny(timeoutTask, waitTask); // Assert Assert.Equal(waitTask, completedTask); + Assert.True(waitTask.Result); + Assert.False(didSend); } }