From e93adfab94f51130218b551673c66e192bba1dc4 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Sun, 22 Sep 2024 10:40:23 -0700 Subject: [PATCH] Track publisher confirmations automatically Fixes #1682 * Remove `ConfirmSelectAsync` from `IChannel` * Add parameters to enable confirmations on `IConnection.CreateChannelAsync` * Remove / comment out all use of `WaitForConfirms...` methods. * Dispose -> DisposeAsync * Implement confirmation tracking and await-ing in `BasicPublishAsync` * Ensure exceptions make into inner exception for `HardProtocolException` * Remove commented-out code related to `WaitForConfirms...` methods. * Unblock so that `CloseAsync` can succeed. * Introduce channel creation options * Allow cancellation of final await for publisher confirmation in `BasicPublishAsync` * Fix `dotnet format` verification error * Make `ConfirmSelectAsync` `private` and assume that semaphore is held. * Track sequence number for basic.return * Implement `basic.return` support. * Fix the code that adds OTel and publish sequence number headers. * Add publish sequence number as long as `_publisherConfirmationsEnabled` is true * Fix the `PublisherConfirms` test program * Add license headers * Enable publisher confirms * Spike an exception based approach (misses removing the bool value return type) * Extend the use of `_confirmSemaphore` to the duration of when exceptions could be caught. * Restore how @danielmarbach serialized the publish sequence number. * Fix bug in how headers are added to `BasicProperties` that don't already have them. * Use `ValueTask` as the `BasicPublishAsync` return value. --------- Co-authored-by: Daniel Marbach Co-authored-by: Luke Bakken * Add `PublishException` class. * Test that non-routable messages result in `PublishException` with `IsReturn = true` * Code documentation Simplify code. --- projects/RabbitMQ.Client/Constants.cs | 8 +- .../RabbitMQ.Client/CreateChannelOptions.cs | 35 + .../Events/ShutdownEventArgs.cs | 3 +- .../OperationInterruptedException.cs | 37 +- .../Exceptions/ProtocolViolationException.cs | 8 +- .../Exceptions/PublishException.cs | 66 ++ .../Exceptions/RabbitMQClientException.cs | 7 +- .../Exceptions/UnexpectedMethodException.cs | 1 + projects/RabbitMQ.Client/Framing/Channel.cs | 1 + projects/RabbitMQ.Client/IChannel.cs | 36 +- projects/RabbitMQ.Client/IConnection.cs | 13 +- .../Impl/AutorecoveringChannel.cs | 33 +- .../Impl/AutorecoveringConnection.cs | 28 +- projects/RabbitMQ.Client/Impl/ChannelBase.cs | 635 ++++++++---------- projects/RabbitMQ.Client/Impl/Connection.cs | 12 +- .../Impl/RecoveryAwareChannel.cs | 3 +- .../RabbitMQ.Client/PublicAPI.Shipped.txt | 17 - .../RabbitMQ.Client/PublicAPI.Unshipped.txt | 26 + .../Applications/CreateChannel/Program.cs | 35 +- projects/Test/Applications/GH-1647/Program.cs | 54 +- .../Test/Applications/MassPublish/Program.cs | 20 +- .../PublisherConfirms/PublisherConfirms.cs | 157 ++++- .../PublisherConfirms.csproj | 2 +- projects/Test/Common/IntegrationFixture.cs | 12 +- .../Test/Common/TestConnectionRecoveryBase.cs | 36 +- .../TestBasicAckAndBasicNack.cs | 1 - .../TestConnectionRecovery.cs | 6 +- .../TestExchangeRecovery.cs | 3 - .../Test/Integration/TestAsyncConsumer.cs | 25 +- .../TestAsyncEventingBasicConsumer.cs | 2 +- projects/Test/Integration/TestBasicPublish.cs | 19 +- .../Test/Integration/TestBasicPublishAsync.cs | 2 - .../Test/Integration/TestChannelAllocation.cs | 10 +- .../TestConcurrentAccessWithSharedChannel.cs | 71 +- ...estConcurrentAccessWithSharedConnection.cs | 3 +- .../Test/Integration/TestConfirmSelect.cs | 6 - .../Integration/TestConfirmSelectAsync.cs | 14 +- .../Test/Integration/TestConnectionFactory.cs | 2 +- .../TestConnectionRecoveryWithoutSetup.cs | 5 +- .../Integration/TestConnectionShutdown.cs | 6 +- .../TestConnectionTopologyRecovery.cs | 49 +- projects/Test/Integration/TestExtensions.cs | 21 - .../Test/Integration/TestFloodPublishing.cs | 27 +- projects/Test/Integration/TestMessageCount.cs | 2 - .../Test/Integration/TestPublisherConfirms.cs | 88 +-- projects/Test/Integration/TestToxiproxy.cs | 24 +- projects/Test/OAuth2/TestOAuth2.cs | 15 +- .../SequentialIntegrationFixture.cs | 6 +- .../TestActivitySource.cs | 24 - .../TestConnectionBlocked.cs | 12 +- .../TestConnectionRecovery.cs | 12 +- .../TestOpenTelemetry.cs | 10 - 52 files changed, 892 insertions(+), 858 deletions(-) create mode 100644 projects/RabbitMQ.Client/CreateChannelOptions.cs create mode 100644 projects/RabbitMQ.Client/Exceptions/PublishException.cs diff --git a/projects/RabbitMQ.Client/Constants.cs b/projects/RabbitMQ.Client/Constants.cs index 637d53a785..c6748c8723 100644 --- a/projects/RabbitMQ.Client/Constants.cs +++ b/projects/RabbitMQ.Client/Constants.cs @@ -87,9 +87,15 @@ public static class Constants /// /// The default consumer dispatch concurrency. See /// to set this value for every channel created on a connection, - /// and + /// and /// for setting this value for a particular channel. /// public const ushort DefaultConsumerDispatchConcurrency = 1; + + /// + /// The message header used to track publish sequence numbers, to allow correlation when + /// basic.return is sent via the broker. + /// + public const string PublishSequenceNumberHeader = "x-dotnet-pub-seq-no"; } } diff --git a/projects/RabbitMQ.Client/CreateChannelOptions.cs b/projects/RabbitMQ.Client/CreateChannelOptions.cs new file mode 100644 index 0000000000..17a9d61057 --- /dev/null +++ b/projects/RabbitMQ.Client/CreateChannelOptions.cs @@ -0,0 +1,35 @@ +namespace RabbitMQ.Client +{ + /// + /// Channel creation options. + /// + public sealed class CreateChannelOptions + { + /// + /// Enable or disable publisher confirmations on this channel. Defaults to false + /// + public bool PublisherConfirmationsEnabled { get; set; } = false; + + /// + /// Should this library track publisher confirmations for you? Defaults to false + /// + public bool PublisherConfirmationTrackingEnabled { get; set; } = false; + + /// + /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one + /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading. + /// can handle concurrency much more efficiently due to the non-blocking nature of the consumer. + /// + /// Defaults to null, which will use the value from + /// + /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. + /// In addition to that consumers need to be thread/concurrency safe. + /// + public ushort? ConsumerDispatchConcurrency { get; set; } = null; + + /// + /// The default channel options. + /// + public static CreateChannelOptions Default { get; } = new CreateChannelOptions(); + } +} diff --git a/projects/RabbitMQ.Client/Events/ShutdownEventArgs.cs b/projects/RabbitMQ.Client/Events/ShutdownEventArgs.cs index cf8ed5b3cd..f5b5bd282f 100644 --- a/projects/RabbitMQ.Client/Events/ShutdownEventArgs.cs +++ b/projects/RabbitMQ.Client/Events/ShutdownEventArgs.cs @@ -72,7 +72,8 @@ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string r /// /// Construct a with the given parameters. /// - public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception, CancellationToken cancellationToken = default) + public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, + Exception exception, CancellationToken cancellationToken = default) : this(initiator, replyCode, replyText, 0, 0, cancellationToken: cancellationToken) { _exception = exception ?? throw new ArgumentNullException(nameof(exception)); diff --git a/projects/RabbitMQ.Client/Exceptions/OperationInterruptedException.cs b/projects/RabbitMQ.Client/Exceptions/OperationInterruptedException.cs index e1cb60e593..78e2238d64 100644 --- a/projects/RabbitMQ.Client/Exceptions/OperationInterruptedException.cs +++ b/projects/RabbitMQ.Client/Exceptions/OperationInterruptedException.cs @@ -45,35 +45,30 @@ namespace RabbitMQ.Client.Exceptions public class OperationInterruptedException : RabbitMQClientException { - ///Construct an OperationInterruptedException with - ///the passed-in explanation, if any. - public OperationInterruptedException(ShutdownEventArgs? reason) - : base(reason is null ? "The AMQP operation was interrupted" : - $"The AMQP operation was interrupted: {reason}") + /// + ///Construct an OperationInterruptedException + /// + public OperationInterruptedException() : base("The AMQP operation was interrupted") { - ShutdownReason = reason; - } - ///Construct an OperationInterruptedException with - ///the passed-in explanation and prefix, if any. - public OperationInterruptedException(ShutdownEventArgs? reason, string prefix) - : base(reason is null ? $"{prefix}: The AMQP operation was interrupted" : - $"{prefix}: The AMQP operation was interrupted: {reason}") - { - ShutdownReason = reason; } - - protected OperationInterruptedException() + /// + ///Construct an OperationInterruptedException with + ///the passed-in explanation, if any. + /// + public OperationInterruptedException(ShutdownEventArgs reason) + : base($"The AMQP operation was interrupted: {reason}", reason.Exception) { + ShutdownReason = reason; } - protected OperationInterruptedException(string message) : base(message) - { - } - protected OperationInterruptedException(string message, Exception inner) - : base(message, inner) + ///Construct an OperationInterruptedException with + ///the passed-in explanation and prefix, if any. + public OperationInterruptedException(ShutdownEventArgs reason, string prefix) + : base($"{prefix}: The AMQP operation was interrupted: {reason}", reason.Exception) { + ShutdownReason = reason; } ///Retrieves the explanation for the shutdown. May diff --git a/projects/RabbitMQ.Client/Exceptions/ProtocolViolationException.cs b/projects/RabbitMQ.Client/Exceptions/ProtocolViolationException.cs index 249ab78b65..2e973f5f44 100644 --- a/projects/RabbitMQ.Client/Exceptions/ProtocolViolationException.cs +++ b/projects/RabbitMQ.Client/Exceptions/ProtocolViolationException.cs @@ -36,13 +36,15 @@ namespace RabbitMQ.Client.Exceptions [Serializable] public class ProtocolViolationException : RabbitMQClientException { - public ProtocolViolationException(string message) : base(message) + public ProtocolViolationException() : base() { } - public ProtocolViolationException(string message, Exception inner) : base(message, inner) + + public ProtocolViolationException(string message) : base(message) { } - public ProtocolViolationException() + + public ProtocolViolationException(string message, Exception inner) : base(message, inner) { } } diff --git a/projects/RabbitMQ.Client/Exceptions/PublishException.cs b/projects/RabbitMQ.Client/Exceptions/PublishException.cs new file mode 100644 index 0000000000..ac404f660c --- /dev/null +++ b/projects/RabbitMQ.Client/Exceptions/PublishException.cs @@ -0,0 +1,66 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System; + +namespace RabbitMQ.Client.Exceptions +{ + /// + /// Class for exceptions related to publisher confirmations + /// or the mandatory flag. + /// + public class PublishException : RabbitMQClientException + { + private bool _isReturn = false; + private ulong _publishSequenceNumber = ulong.MinValue; + + public PublishException(ulong publishSequenceNumber, bool isReturn) : base() + { + if (publishSequenceNumber == ulong.MinValue) + { + throw new ArgumentException($"{nameof(publishSequenceNumber)} must not be 0"); + } + + _isReturn = isReturn; + _publishSequenceNumber = publishSequenceNumber; + } + + /// + /// true if this exception is due to a basic.return + /// + public bool IsReturn => _isReturn; + + /// + /// Retrieve the publish sequence number. + /// + public ulong PublishSequenceNumber => _publishSequenceNumber; + } +} diff --git a/projects/RabbitMQ.Client/Exceptions/RabbitMQClientException.cs b/projects/RabbitMQ.Client/Exceptions/RabbitMQClientException.cs index c8a5371154..97e5403990 100644 --- a/projects/RabbitMQ.Client/Exceptions/RabbitMQClientException.cs +++ b/projects/RabbitMQ.Client/Exceptions/RabbitMQClientException.cs @@ -37,24 +37,21 @@ namespace RabbitMQ.Client.Exceptions public abstract class RabbitMQClientException : Exception { /// Initializes a new instance of the class. - protected RabbitMQClientException() + protected RabbitMQClientException() : base() { - } /// Initializes a new instance of the class with a specified error message. /// The message that describes the error. protected RabbitMQClientException(string message) : base(message) { - } /// Initializes a new instance of the class with a specified error message and a reference to the inner exception that is the cause of this exception. /// The error message that explains the reason for the exception. /// The exception that is the cause of the current exception, or a null reference (Nothing in Visual Basic) if no inner exception is specified. - protected RabbitMQClientException(string message, Exception innerException) : base(message, innerException) + protected RabbitMQClientException(string message, Exception? innerException) : base(message, innerException) { - } } } diff --git a/projects/RabbitMQ.Client/Exceptions/UnexpectedMethodException.cs b/projects/RabbitMQ.Client/Exceptions/UnexpectedMethodException.cs index 5e925053db..7da8804e95 100644 --- a/projects/RabbitMQ.Client/Exceptions/UnexpectedMethodException.cs +++ b/projects/RabbitMQ.Client/Exceptions/UnexpectedMethodException.cs @@ -35,6 +35,7 @@ namespace RabbitMQ.Client.Exceptions { /// + /// TODO WHY IS THIS UNREFERENCED? /// Thrown when the channel receives an RPC reply that it wasn't expecting. /// [Serializable] diff --git a/projects/RabbitMQ.Client/Framing/Channel.cs b/projects/RabbitMQ.Client/Framing/Channel.cs index ddb1394bdd..a6c809d367 100644 --- a/projects/RabbitMQ.Client/Framing/Channel.cs +++ b/projects/RabbitMQ.Client/Framing/Channel.cs @@ -35,6 +35,7 @@ namespace RabbitMQ.Client.Framing { + // TODO merge into ChannelBase internal class Channel : ChannelBase { public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null) diff --git a/projects/RabbitMQ.Client/IChannel.cs b/projects/RabbitMQ.Client/IChannel.cs index 63fc72756d..852b7e8ee4 100644 --- a/projects/RabbitMQ.Client/IChannel.cs +++ b/projects/RabbitMQ.Client/IChannel.cs @@ -204,6 +204,7 @@ Task BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b /// CancellationToken for this operation. /// /// Routing key must be shorter than 255 bytes. + /// Throws if a nack or basic.return is returned for the message. /// ValueTask BasicPublishAsync(string exchange, string routingKey, bool mandatory, TProperties basicProperties, ReadOnlyMemory body, @@ -221,6 +222,7 @@ ValueTask BasicPublishAsync(string exchange, string routingKey, /// CancellationToken for this operation. /// /// Routing key must be shorter than 255 bytes. + /// Throws if a nack or basic.return is returned for the message. /// ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, bool mandatory, TProperties basicProperties, ReadOnlyMemory body, @@ -265,14 +267,6 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort, Task CloseAsync(ShutdownEventArgs reason, bool abort, CancellationToken cancellationToken = default); - /// - /// Asynchronously enable publisher confirmations. - /// - /// Set to false if tracking via and yourself. - /// CancellationToken for this operation. - Task ConfirmSelectAsync(bool trackConfirmations = true, - CancellationToken cancellationToken = default); - /// Asynchronously declare an exchange. /// The name of the exchange. /// The type of the exchange. @@ -451,32 +445,6 @@ Task QueueUnbindAsync(string queue, string exchange, string routingKey, /// The cancellation token. Task TxSelectAsync(CancellationToken cancellationToken = default); - /// - /// Asynchronously wait until all published messages on this channel have been confirmed. - /// - /// True if no nacks were received within the timeout, otherwise false. - /// The cancellation token. - /// - /// Waits until all messages published on this channel since the last call have - /// been either ack'd or nack'd by the server. Returns whether - /// all the messages were ack'd (and none were nack'd). - /// Throws an exception when called on a channel - /// that does not have publisher confirms enabled. - /// - Task WaitForConfirmsAsync(CancellationToken cancellationToken = default); - - /// - /// Wait until all published messages on this channel have been confirmed. - /// - /// The cancellation token. - /// - /// Waits until all messages published on this channel since the last call have - /// been ack'd by the server. If a nack is received or the timeout - /// elapses, throws an IOException exception immediately and closes - /// the channel. - /// - Task WaitForConfirmsOrDieAsync(CancellationToken cancellationToken = default); - /// /// Amount of time protocol operations (e.g. queue.declare) are allowed to take before /// timing out. diff --git a/projects/RabbitMQ.Client/IConnection.cs b/projects/RabbitMQ.Client/IConnection.cs index c562307a7c..8217013e3f 100644 --- a/projects/RabbitMQ.Client/IConnection.cs +++ b/projects/RabbitMQ.Client/IConnection.cs @@ -240,18 +240,11 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo /// /// Asynchronously create and return a fresh channel, session, and channel. /// - /// - /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one - /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading. - /// can handle concurrency much more efficiently due to the non-blocking nature of the consumer. - /// - /// Defaults to null, which will use the value from - /// - /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. - /// In addition to that consumers need to be thread/concurrency safe. + /// + /// The channel creation options. /// /// Cancellation token - Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null, + Task CreateChannelAsync(CreateChannelOptions? options = default, CancellationToken cancellationToken = default); } } diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 6e88ee9070..2a2efec637 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -49,8 +49,8 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable private ushort _prefetchCountConsumer; private ushort _prefetchCountGlobal; - private bool _usesPublisherConfirms; - private bool _tracksPublisherConfirmations; + private bool _publisherConfirmationsEnabled = false; + private bool _publisherConfirmationTrackingEnabled = false; private bool _usesTransactions; private ushort _consumerDispatchConcurrency; @@ -72,11 +72,13 @@ public TimeSpan ContinuationTimeout } public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel, - ushort consumerDispatchConcurrency) + ushort consumerDispatchConcurrency, bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled) { _connection = conn; _innerChannel = innerChannel; _consumerDispatchConcurrency = consumerDispatchConcurrency; + _publisherConfirmationsEnabled = publisherConfirmationsEnabled; + _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; } public event AsyncEventHandler BasicAcksAsync @@ -161,8 +163,9 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection con _connection = conn; - RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_consumerDispatchConcurrency, - cancellationToken: cancellationToken) + RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync( + _publisherConfirmationsEnabled, _publisherConfirmationTrackingEnabled, + _consumerDispatchConcurrency, cancellationToken) .ConfigureAwait(false); newChannel.TakeOver(_innerChannel); @@ -178,12 +181,6 @@ await newChannel.BasicQosAsync(0, _prefetchCountGlobal, true, cancellationToken) .ConfigureAwait(false); } - if (_usesPublisherConfirms) - { - await newChannel.ConfirmSelectAsync(_tracksPublisherConfirmations, cancellationToken) - .ConfigureAwait(false); - } - if (_usesTransactions) { await newChannel.TxSelectAsync(cancellationToken) @@ -347,14 +344,6 @@ public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global, return _innerChannel.BasicQosAsync(prefetchSize, prefetchCount, global, cancellationToken); } - public async Task ConfirmSelectAsync(bool trackConfirmations = true, CancellationToken cancellationToken = default) - { - await InnerChannel.ConfirmSelectAsync(trackConfirmations, cancellationToken) - .ConfigureAwait(false); - _usesPublisherConfirms = true; - _tracksPublisherConfirmations = trackConfirmations; - } - public async Task ExchangeBindAsync(string destination, string source, string routingKey, IDictionary? arguments, bool noWait, CancellationToken cancellationToken) @@ -487,12 +476,6 @@ public Task TxSelectAsync(CancellationToken cancellationToken) return InnerChannel.TxSelectAsync(cancellationToken); } - public Task WaitForConfirmsAsync(CancellationToken token = default) - => InnerChannel.WaitForConfirmsAsync(token); - - public Task WaitForConfirmsOrDieAsync(CancellationToken token = default) - => InnerChannel.WaitForConfirmsOrDieAsync(token); - [MethodImpl(MethodImplOptions.AggressiveInlining)] private void ThrowIfDisposed() { diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index dc99e33fc4..5cee094c37 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -184,12 +184,17 @@ public event AsyncEventHandler RecoveringConsumerAs public IProtocol Protocol => Endpoint.Protocol; - public async ValueTask CreateNonRecoveringChannelAsync(ushort consumerDispatchConcurrency, + public async ValueTask CreateNonRecoveringChannelAsync( + bool publisherConfirmationsEnabled = false, + bool publisherConfirmationTrackingEnabled = false, + ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = default) { ISession session = InnerConnection.CreateSession(); var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency); - return (RecoveryAwareChannel)await result.OpenAsync(cancellationToken).ConfigureAwait(false); + return (RecoveryAwareChannel)await result.OpenAsync( + publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, cancellationToken) + .ConfigureAwait(false); } public override string ToString() @@ -251,17 +256,24 @@ await CloseInnerConnectionAsync() } } - public async Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null, + public async Task CreateChannelAsync(CreateChannelOptions? options = default, CancellationToken cancellationToken = default) { EnsureIsOpen(); - ushort cdc = consumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency); - RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken) + + options ??= CreateChannelOptions.Default; + + ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency); + + RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync( + options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cdc, cancellationToken) .ConfigureAwait(false); - AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc); - await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken) + + var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc, + options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled); + await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken) .ConfigureAwait(false); - return channel; + return autorecoveringChannel; } public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); diff --git a/projects/RabbitMQ.Client/Impl/ChannelBase.cs b/projects/RabbitMQ.Client/Impl/ChannelBase.cs index 517cf6b8b4..6ba9ed60ce 100644 --- a/projects/RabbitMQ.Client/Impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/Impl/ChannelBase.cs @@ -30,6 +30,8 @@ //--------------------------------------------------------------------------- using System; +using System.Buffers.Binary; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; @@ -42,6 +44,7 @@ using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing; +using RabbitMQ.Client.Util; namespace RabbitMQ.Client.Impl { @@ -57,21 +60,18 @@ internal abstract class ChannelBase : IChannel, IRecoverable private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue(); private readonly AsyncManualResetEvent _flowControlBlock = new AsyncManualResetEvent(true); - private ulong _nextPublishSeqNo; - private SemaphoreSlim? _confirmSemaphore; - private bool _trackConfirmations; - private LinkedList? _pendingDeliveryTags; - private List>? _confirmsTaskCompletionSources; - - private bool _onlyAcksReceived = true; + private bool _publisherConfirmationsEnabled = false; + private bool _publisherConfirmationTrackingEnabled = false; + private ulong _nextPublishSeqNo = 0; + private readonly SemaphoreSlim _confirmSemaphore = new(1, 1); + private readonly ConcurrentDictionary> _confirmsTaskCompletionSources = new(); private ShutdownEventArgs? _closeReason; public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); internal readonly IConsumerDispatcher ConsumerDispatcher; - protected ChannelBase(ConnectionConfig config, ISession session, - ushort? perChannelConsumerDispatchConcurrency = null) + protected ChannelBase(ConnectionConfig config, ISession session, ushort? perChannelConsumerDispatchConcurrency = null) { ContinuationTimeout = config.ContinuationTimeout; ConsumerDispatcher = new AsyncConsumerDispatcher(this, @@ -371,8 +371,13 @@ protected bool Enqueue(IRpcContinuation k) } } - internal async Task OpenAsync(CancellationToken cancellationToken) + internal async Task OpenAsync(bool publisherConfirmationsEnabled = false, + bool publisherConfirmationTrackingEnabled = false, + CancellationToken cancellationToken = default) { + _publisherConfirmationsEnabled = publisherConfirmationsEnabled; + _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; + bool enqueued = false; var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken); @@ -388,7 +393,12 @@ await ModelSendAsync(in method, k.CancellationToken) bool result = await k; Debug.Assert(result); - return this; + + if (_publisherConfirmationsEnabled) + { + await ConfirmSelectAsync(publisherConfirmationTrackingEnabled, cancellationToken) + .ConfigureAwait(false); + } } finally { @@ -398,6 +408,8 @@ await ModelSendAsync(in method, k.CancellationToken) } _rpcSemaphore.Release(); } + + return this; } internal async Task FinishCloseAsync(CancellationToken cancellationToken) @@ -412,9 +424,6 @@ await Session.CloseAsync(reason) m_connectionStartCell?.TrySetResult(null); } - [MemberNotNullWhen(true, nameof(_confirmSemaphore))] - private bool ConfirmsAreEnabled => _confirmSemaphore != null; - private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken) { /* @@ -489,16 +498,16 @@ private async Task OnChannelShutdownAsync(ShutdownEventArgs reason) await _channelShutdownAsyncWrapper.InvokeAsync(this, reason) .ConfigureAwait(false); - if (ConfirmsAreEnabled) + if (_publisherConfirmationsEnabled) { await _confirmSemaphore.WaitAsync(reason.CancellationToken) .ConfigureAwait(false); try { - if (_confirmsTaskCompletionSources?.Count > 0) + if (!_confirmsTaskCompletionSources.IsEmpty) { var exception = new AlreadyClosedException(reason); - foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources) + foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources.Values) { confirmsTaskCompletionSource.TrySetException(exception); } @@ -594,7 +603,8 @@ public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heart return ModelSendAsync(in method, cancellationToken).AsTask(); } - protected async Task HandleBasicAck(IncomingCommand cmd, CancellationToken cancellationToken) + protected async Task HandleBasicAck(IncomingCommand cmd, + CancellationToken cancellationToken = default) { var ack = new BasicAck(cmd.MethodSpan); if (!_basicAcksAsyncWrapper.IsEmpty) @@ -604,12 +614,14 @@ await _basicAcksAsyncWrapper.InvokeAsync(this, args) .ConfigureAwait(false); } - await HandleAckNack(ack._deliveryTag, ack._multiple, false, cancellationToken) + await HandleAck(ack._deliveryTag, ack._multiple, cancellationToken) .ConfigureAwait(false); + return true; } - protected async Task HandleBasicNack(IncomingCommand cmd, CancellationToken cancellationToken) + protected async Task HandleBasicNack(IncomingCommand cmd, + CancellationToken cancellationToken = default) { var nack = new BasicNack(cmd.MethodSpan); if (!_basicNacksAsyncWrapper.IsEmpty) @@ -620,8 +632,40 @@ await _basicNacksAsyncWrapper.InvokeAsync(this, args) .ConfigureAwait(false); } - await HandleAckNack(nack._deliveryTag, nack._multiple, true, cancellationToken) + await HandleNack(nack._deliveryTag, nack._multiple, false, cancellationToken) .ConfigureAwait(false); + + return true; + } + + protected async Task HandleBasicReturn(IncomingCommand cmd, CancellationToken cancellationToken) + { + var basicReturn = new BasicReturn(cmd.MethodSpan); + + var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText, + basicReturn._exchange, basicReturn._routingKey, + new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory, cancellationToken); + + if (!_basicReturnAsyncWrapper.IsEmpty) + { + await _basicReturnAsyncWrapper.InvokeAsync(this, e) + .ConfigureAwait(false); + } + + if (_publisherConfirmationsEnabled) + { + ulong publishSequenceNumber = 0; + IReadOnlyBasicProperties props = e.BasicProperties; + object? maybeSeqNum = props.Headers?[Constants.PublishSequenceNumberHeader]; + if (maybeSeqNum != null) + { + publishSequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum); + } + + await HandleNack(publishSequenceNumber, multiple: false, isReturn: true, cancellationToken) + .ConfigureAwait(false); + } + return true; } @@ -658,20 +702,6 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) return deliveryTag; } - protected async Task HandleBasicReturn(IncomingCommand cmd, CancellationToken cancellationToken) - { - if (!_basicReturnAsyncWrapper.IsEmpty) - { - var basicReturn = new BasicReturn(cmd.MethodSpan); - var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText, - basicReturn._exchange, basicReturn._routingKey, - new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory, cancellationToken); - await _basicReturnAsyncWrapper.InvokeAsync(this, e) - .ConfigureAwait(false); - } - return true; - } - protected async Task HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) { var channelClose = new ChannelClose(cmd.MethodSpan); @@ -825,7 +855,7 @@ await Session.Connection.HandleConnectionUnblockedAsync(cancellationToken) public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) { - if (ConfirmsAreEnabled) + if (_publisherConfirmationsEnabled) { await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try @@ -973,83 +1003,79 @@ public async ValueTask BasicPublishAsync(string exchange, string ro CancellationToken cancellationToken = default) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { - if (ConfirmsAreEnabled) + TaskCompletionSource? publisherConfirmationTcs = null; + ulong publishSequenceNumber = 0; + try { - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - try + if (_publisherConfirmationsEnabled) { - if (_trackConfirmations) + await _confirmSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); + + publishSequenceNumber = _nextPublishSeqNo; + + if (_publisherConfirmationTrackingEnabled) { - if (_pendingDeliveryTags is null) - { - throw new InvalidOperationException(InternalConstants.BugFound); - } - _pendingDeliveryTags.AddLast(_nextPublishSeqNo); + publisherConfirmationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + _confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs; } _nextPublishSeqNo++; } - finally - { - _confirmSemaphore.Release(); - } - } - try - { + await EnforceFlowControlAsync(cancellationToken) + .ConfigureAwait(false); + var cmd = new BasicPublish(exchange, routingKey, mandatory, default); + using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners ? RabbitMQActivitySource.Send(routingKey, exchange, body.Length) : default; - if (sendActivity != null) + BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber); + if (props is null) { - BasicProperties? props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); - if (props is null) - { - await EnforceFlowControlAsync(cancellationToken) - .ConfigureAwait(false); - await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) - .ConfigureAwait(false); - } - else - { - await EnforceFlowControlAsync(cancellationToken) - .ConfigureAwait(false); - await ModelSendAsync(in cmd, in props, body, cancellationToken) - .ConfigureAwait(false); - } + await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) + .ConfigureAwait(false); } else { - await EnforceFlowControlAsync(cancellationToken) - .ConfigureAwait(false); - await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) + await ModelSendAsync(in cmd, in props, body, cancellationToken) .ConfigureAwait(false); } } - catch + catch (Exception ex) { - if (ConfirmsAreEnabled) + if (_publisherConfirmationsEnabled) { - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - try - { - _nextPublishSeqNo--; - if (_trackConfirmations && _pendingDeliveryTags is not null) - { - _pendingDeliveryTags.RemoveLast(); - } - } - finally + _nextPublishSeqNo--; + if (_publisherConfirmationTrackingEnabled) { - _confirmSemaphore.Release(); + _confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _); } } - throw; + if (publisherConfirmationTcs is not null) + { + publisherConfirmationTcs.SetException(ex); + } + else + { + throw; + } + } + finally + { + if (_publisherConfirmationsEnabled) + { + _confirmSemaphore.Release(); + } + } + + if (publisherConfirmationTcs is not null) + { + await publisherConfirmationTcs.Task.WaitAsync(cancellationToken) + .ConfigureAwait(false); } } @@ -1058,83 +1084,79 @@ public async ValueTask BasicPublishAsync(CachedString exchange, Cac CancellationToken cancellationToken = default) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { - if (ConfirmsAreEnabled) + TaskCompletionSource? publisherConfirmationTcs = null; + ulong publishSequenceNumber = 0; + try { - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - try + if (_publisherConfirmationsEnabled) { - if (_trackConfirmations) + await _confirmSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); + + publishSequenceNumber = _nextPublishSeqNo; + + if (_publisherConfirmationTrackingEnabled) { - if (_pendingDeliveryTags is null) - { - throw new InvalidOperationException(InternalConstants.BugFound); - } - _pendingDeliveryTags.AddLast(_nextPublishSeqNo); + publisherConfirmationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + _confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs; } _nextPublishSeqNo++; } - finally - { - _confirmSemaphore.Release(); - } - } - try - { + await EnforceFlowControlAsync(cancellationToken) + .ConfigureAwait(false); + var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); + using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length) : default; - if (sendActivity != null) + BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber); + if (props is null) { - BasicProperties? props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); - if (props is null) - { - await EnforceFlowControlAsync(cancellationToken) - .ConfigureAwait(false); - await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) - .ConfigureAwait(false); - } - else - { - await EnforceFlowControlAsync(cancellationToken) - .ConfigureAwait(false); - await ModelSendAsync(in cmd, in props, body, cancellationToken) - .ConfigureAwait(false); - } + await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) + .ConfigureAwait(false); } else { - await EnforceFlowControlAsync(cancellationToken) - .ConfigureAwait(false); - await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken) + await ModelSendAsync(in cmd, in props, body, cancellationToken) .ConfigureAwait(false); } } - catch + catch (Exception ex) { - if (ConfirmsAreEnabled) + if (_publisherConfirmationsEnabled) { - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - try + _nextPublishSeqNo--; + if (_publisherConfirmationTrackingEnabled) { - _nextPublishSeqNo--; - if (_trackConfirmations && _pendingDeliveryTags is not null) - { - _pendingDeliveryTags.RemoveLast(); - } - } - finally - { - _confirmSemaphore.Release(); + _confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _); } } - throw; + if (publisherConfirmationTcs is not null) + { + publisherConfirmationTcs.SetException(ex); + } + else + { + throw; + } + } + finally + { + if (_publisherConfirmationsEnabled) + { + _confirmSemaphore.Release(); + } + } + + if (publisherConfirmationTcs is not null) + { + await publisherConfirmationTcs.Task.WaitAsync(cancellationToken) + .ConfigureAwait(false); } } @@ -1210,51 +1232,6 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public async Task ConfirmSelectAsync(bool trackConfirmations = true, CancellationToken cancellationToken = default) - { - _trackConfirmations = trackConfirmations; - bool enqueued = false; - var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken); - - await _rpcSemaphore.WaitAsync(k.CancellationToken) - .ConfigureAwait(false); - try - { - if (_nextPublishSeqNo == 0UL) - { - if (_trackConfirmations) - { - _pendingDeliveryTags = new LinkedList(); - _confirmsTaskCompletionSources = new List>(); - } - _nextPublishSeqNo = 1; - } - - enqueued = Enqueue(k); - - var method = new ConfirmSelect(false); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - bool result = await k; - Debug.Assert(result); - - // Note: - // Non-null means confirms are enabled - _confirmSemaphore = new SemaphoreSlim(1, 1); - - return; - } - finally - { - if (false == enqueued) - { - k.Dispose(); - } - _rpcSemaphore.Release(); - } - } - public async Task ExchangeBindAsync(string destination, string source, string routingKey, IDictionary? arguments, bool noWait, CancellationToken cancellationToken) @@ -1739,244 +1716,168 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public async Task WaitForConfirmsAsync(CancellationToken cancellationToken = default) + // NOTE: _rpcSemaphore is held + private async Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnablefd = false, + CancellationToken cancellationToken = default) { - if (false == ConfirmsAreEnabled) - { - throw new InvalidOperationException("Confirms not selected"); - } + _publisherConfirmationsEnabled = true; + _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnablefd; - if (false == _trackConfirmations) - { - throw new InvalidOperationException("Confirmation tracking is not enabled"); - } - - if (_pendingDeliveryTags is null) - { - throw new InvalidOperationException(InternalConstants.BugFound); - } + bool enqueued = false; + var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken); - TaskCompletionSource tcs; - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); try { - if (_pendingDeliveryTags.Count == 0) + if (_nextPublishSeqNo == 0UL) { - if (_onlyAcksReceived == false) + if (_publisherConfirmationTrackingEnabled) { - _onlyAcksReceived = true; - return false; + _confirmsTaskCompletionSources.Clear(); } - - return true; + _nextPublishSeqNo = 1; } - tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _confirmsTaskCompletionSources!.Add(tcs); - } - finally - { - _confirmSemaphore.Release(); - } - - bool rv; + enqueued = Enqueue(k); - if (false == cancellationToken.CanBeCanceled) - { - rv = await tcs.Task.ConfigureAwait(false); - } - else - { - rv = await WaitForConfirmsWithTokenAsync(tcs, cancellationToken) + var method = new ConfirmSelect(false); + await ModelSendAsync(in method, k.CancellationToken) .ConfigureAwait(false); - } - - return rv; - } - - public async Task WaitForConfirmsOrDieAsync(CancellationToken token = default) - { - if (false == ConfirmsAreEnabled) - { - throw new InvalidOperationException("Confirms not selected"); - } - if (false == _trackConfirmations) - { - throw new InvalidOperationException("Confirmation tracking is not enabled"); - } + bool result = await k; + Debug.Assert(result); - if (_pendingDeliveryTags is null) - { - throw new InvalidOperationException(InternalConstants.BugFound); + return; } - - try + finally { - bool onlyAcksReceived = await WaitForConfirmsAsync(token) - .ConfigureAwait(false); - - if (onlyAcksReceived) + if (false == enqueued) { - return; + k.Dispose(); } - - var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.ReplySuccess, "Nacks Received", new IOException("nack received")); - - await CloseAsync(ea, false, token) - .ConfigureAwait(false); - } - catch (OperationCanceledException ex) - { - const string msg = "timed out waiting for acks"; - var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.ReplySuccess, msg, ex); - - await CloseAsync(ea, false, token) - .ConfigureAwait(false); - - throw; - } - } - - private async Task WaitForConfirmsWithTokenAsync(TaskCompletionSource tcs, - CancellationToken cancellationToken) - { - if (false == ConfirmsAreEnabled) - { - throw new InvalidOperationException("Confirms not selected"); - } - - if (false == _trackConfirmations) - { - throw new InvalidOperationException("Confirmation tracking is not enabled"); - } - - if (_pendingDeliveryTags is null) - { - throw new InvalidOperationException(InternalConstants.BugFound); - } - - CancellationTokenRegistration tokenRegistration = -#if NET6_0_OR_GREATER - cancellationToken.UnsafeRegister( - state => ((TaskCompletionSource)state!).TrySetCanceled(), tcs); -#else - cancellationToken.Register( - state => ((TaskCompletionSource)state!).TrySetCanceled(), - state: tcs, useSynchronizationContext: false); -#endif - try - { - return await tcs.Task.ConfigureAwait(false); - } - finally - { -#if NET6_0_OR_GREATER - await tokenRegistration.DisposeAsync() - .ConfigureAwait(false); -#else - tokenRegistration.Dispose(); -#endif } } - // NOTE: this method is internal for its use in this test: - // TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse - internal async Task HandleAckNack(ulong deliveryTag, bool multiple, bool isNack, CancellationToken cancellationToken = default) + private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cancellationToken = default) { - // Only do this if confirms are enabled *and* the library is tracking confirmations - if (ConfirmsAreEnabled && _trackConfirmations) + if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty) { - if (_pendingDeliveryTags is null) - { - throw new InvalidOperationException(InternalConstants.BugFound); - } - // let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - try + if (multiple) { - // No need to do anything if there are no delivery tags in the list - if (_pendingDeliveryTags.Count > 0) + foreach (KeyValuePair> pair in _confirmsTaskCompletionSources) { - if (multiple) + if (pair.Key <= deliveryTag) { - while (_pendingDeliveryTags.First!.Value < deliveryTag) - { - _pendingDeliveryTags.RemoveFirst(); - } - - if (_pendingDeliveryTags.First.Value == deliveryTag) - { - _pendingDeliveryTags.RemoveFirst(); - } - } - else - { - _pendingDeliveryTags.Remove(deliveryTag); + pair.Value.SetResult(true); + _confirmsTaskCompletionSources.Remove(pair.Key, out _); } } + } + else + { + if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource? tcs)) + { + tcs.SetResult(true); + } + } + } - _onlyAcksReceived = _onlyAcksReceived && !isNack; + return Task.CompletedTask; + } - if (_pendingDeliveryTags.Count == 0 && _confirmsTaskCompletionSources!.Count > 0) + private Task HandleNack(ulong deliveryTag, bool multiple, bool isReturn, + CancellationToken cancellationToken = default) + { + if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty) + { + if (multiple) + { + foreach (KeyValuePair> pair in _confirmsTaskCompletionSources) { - // Done, mark tasks - foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources) + if (pair.Key <= deliveryTag) { - confirmsTaskCompletionSource.TrySetResult(_onlyAcksReceived); + pair.Value.SetException(new PublishException(pair.Key, isReturn)); + _confirmsTaskCompletionSources.Remove(pair.Key, out _); } - - _confirmsTaskCompletionSources.Clear(); - _onlyAcksReceived = true; } } - finally + else { - _confirmSemaphore.Release(); + if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource? tcs)) + { + tcs.SetException(new PublishException(deliveryTag, isReturn)); + } } } + + return Task.CompletedTask; } - private static BasicProperties? PopulateActivityAndPropagateTraceId(TProperties basicProperties, - Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader + private BasicProperties? PopulateBasicPropertiesHeaders(TProperties basicProperties, + Activity? sendActivity, ulong publishSequenceNumber) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader { - // This activity is marked as recorded, so let's propagate the trace and span ids. - if (sendActivity.IsAllDataRequested) + /* + * Note: there is nothing to do in this method if *both* of these + * conditions are true: + * + * sendActivity is null - there is no activity to add as a header + * publisher confirmations are NOT enabled + */ + if (sendActivity is null && !_publisherConfirmationsEnabled) { - if (!string.IsNullOrEmpty(basicProperties.CorrelationId)) - { - sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, basicProperties.CorrelationId); - } + return null; } + bool newHeaders = false; IDictionary? headers = basicProperties.Headers; if (headers is null) { - return AddHeaders(basicProperties, sendActivity); + headers = new Dictionary(); + newHeaders = true; } + MaybeAddActivityToHeaders(headers, basicProperties.CorrelationId, sendActivity); + MaybeAddPublishSequenceNumberToHeaders(headers); - // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. - RabbitMQActivitySource.ContextInjector(sendActivity, headers); - return null; + switch (basicProperties) + { + case BasicProperties writableProperties: + if (newHeaders) + { + writableProperties.Headers = headers; + } + return null; + case EmptyBasicProperty: + return new BasicProperties { Headers = headers }; + default: + return new BasicProperties(basicProperties) { Headers = headers }; + } - static BasicProperties? AddHeaders(TProperties basicProperties, Activity sendActivity) + void MaybeAddActivityToHeaders(IDictionary headers, + string? correlationId, Activity? sendActivity) { - var headers = new Dictionary(); + if (sendActivity is not null) + { + // This activity is marked as recorded, so let's propagate the trace and span ids. + if (sendActivity.IsAllDataRequested) + { + if (!string.IsNullOrEmpty(correlationId)) + { + sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, correlationId); + } + } - // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. - RabbitMQActivitySource.ContextInjector(sendActivity, headers); + // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. + RabbitMQActivitySource.ContextInjector(sendActivity, headers); + } + } - switch (basicProperties) + void MaybeAddPublishSequenceNumberToHeaders(IDictionary headers) + { + if (_publisherConfirmationsEnabled) { - case BasicProperties writableProperties: - writableProperties.Headers = headers; - return null; - case EmptyBasicProperty: - return new BasicProperties { Headers = headers }; - default: - return new BasicProperties(basicProperties) { Headers = headers }; + byte[] publishSequenceNumberBytes = new byte[8]; + NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber); + headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes; } } } diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 4a733ac8c4..373f6deea2 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -264,13 +264,19 @@ await CloseAsync(ea, true, } } - public Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null, + public async Task CreateChannelAsync(CreateChannelOptions? options = default, CancellationToken cancellationToken = default) { EnsureIsOpen(); + + options ??= CreateChannelOptions.Default; ISession session = CreateSession(); - var channel = new Channel(_config, session, consumerDispatchConcurrency); - return channel.OpenAsync(cancellationToken); + + // TODO channel CreateChannelAsync() to combine ctor and OpenAsync + var channel = new Channel(_config, session, options.ConsumerDispatchConcurrency); + IChannel ch = await channel.OpenAsync(options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cancellationToken) + .ConfigureAwait(false); + return ch; } internal ISession CreateSession() diff --git a/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs b/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs index ab1cefecbf..6aefc7bd3b 100644 --- a/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs +++ b/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs @@ -37,7 +37,7 @@ namespace RabbitMQ.Client.Impl { internal sealed class RecoveryAwareChannel : Channel { - public RecoveryAwareChannel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency) + public RecoveryAwareChannel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null) : base(config, session, consumerDispatchConcurrency) { ActiveDeliveryTagOffset = 0; @@ -50,7 +50,6 @@ public RecoveryAwareChannel(ConnectionConfig config, ISession session, ushort co internal void TakeOver(RecoveryAwareChannel other) { base.TakeOver(other); - ActiveDeliveryTagOffset = other.ActiveDeliveryTagOffset + other.MaxSeenDeliveryTag; MaxSeenDeliveryTag = 0; } diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 9b4e290f26..e60ddc2b1d 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -278,8 +278,6 @@ RabbitMQ.Client.Exceptions.MalformedFrameException.MalformedFrameException(strin RabbitMQ.Client.Exceptions.MalformedFrameException.MalformedFrameException(string message, bool canShutdownCleanly) -> void RabbitMQ.Client.Exceptions.OperationInterruptedException RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException() -> void -RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(string message) -> void -RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(string message, System.Exception inner) -> void RabbitMQ.Client.Exceptions.OperationInterruptedException.ShutdownReason.set -> void RabbitMQ.Client.Exceptions.PacketNotRecognizedException RabbitMQ.Client.Exceptions.PacketNotRecognizedException.PacketNotRecognizedException(int transportHigh, int transportLow, int serverMajor, int serverMinor) -> void @@ -299,11 +297,9 @@ RabbitMQ.Client.Exceptions.ProtocolVersionMismatchException.ProtocolVersionMisma RabbitMQ.Client.Exceptions.ProtocolVersionMismatchException.ServerMajor.get -> int RabbitMQ.Client.Exceptions.ProtocolVersionMismatchException.ServerMinor.get -> int RabbitMQ.Client.Exceptions.ProtocolViolationException -RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException() -> void RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException(string message) -> void RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException(string message, System.Exception inner) -> void RabbitMQ.Client.Exceptions.RabbitMQClientException -RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException() -> void RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException(string message) -> void RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException(string message, System.Exception innerException) -> void RabbitMQ.Client.Exceptions.SyntaxErrorException @@ -756,8 +752,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~RabbitMQ.Client.IChannel.TxCommitAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.TxRollbackAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.TxSelectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~RabbitMQ.Client.IChannel.WaitForConfirmsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~RabbitMQ.Client.IChannel.WaitForConfirmsOrDieAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task @@ -785,7 +779,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, System.Collections.Generic.IDictionary? arguments, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel! channel, ushort replyCode, string! replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! @@ -819,16 +812,8 @@ RabbitMQ.Client.ICredentialsProvider.GetCredentialsAsync(System.Threading.Cancel RabbitMQ.Client.ICredentialsProvider.Name.get -> string! RabbitMQ.Client.PlainMechanism.HandleChallengeAsync(byte[]? challenge, RabbitMQ.Client.ConnectionConfig! config, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! readonly RabbitMQ.Client.ConnectionConfig.CredentialsProvider -> RabbitMQ.Client.ICredentialsProvider! -RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -RabbitMQ.Client.IChannel.BasicPublishAsync(string! exchange, string! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort -RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! RabbitMQ.Client.IConnection.CallbackExceptionAsync -> RabbitMQ.Client.Events.AsyncEventHandler! RabbitMQ.Client.IConnection.ConnectionBlockedAsync -> RabbitMQ.Client.Events.AsyncEventHandler! RabbitMQ.Client.IConnection.ConnectionRecoveryErrorAsync -> RabbitMQ.Client.Events.AsyncEventHandler! @@ -888,8 +873,6 @@ RabbitMQ.Client.Events.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.Shutd RabbitMQ.Client.Events.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string! replyText, System.Exception! exception, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void RabbitMQ.Client.Events.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string! replyText, ushort classId, ushort methodId, object? cause = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void RabbitMQ.Client.Exceptions.AlreadyClosedException.AlreadyClosedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> void -RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs? reason) -> void -RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs? reason, string! prefix) -> void RabbitMQ.Client.Exceptions.OperationInterruptedException.ShutdownReason.get -> RabbitMQ.Client.Events.ShutdownEventArgs? RabbitMQ.Client.IAsyncBasicConsumer.HandleChannelShutdownAsync(object! channel, RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> System.Threading.Tasks.Task! RabbitMQ.Client.IChannel.ChannelShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler! diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index e69de29bb2..e9f6253692 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -0,0 +1,26 @@ +const RabbitMQ.Client.Constants.PublishSequenceNumberHeader = "x-dotnet-pub-seq-no" -> string! +RabbitMQ.Client.CreateChannelOptions +RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort? +RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void +RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void +RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool +RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void +RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool +RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.set -> void +RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> void +RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason, string! prefix) -> void +RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException() -> void +RabbitMQ.Client.Exceptions.PublishException +RabbitMQ.Client.Exceptions.PublishException.IsReturn.get -> bool +RabbitMQ.Client.Exceptions.PublishException.PublishException(ulong publishSequenceNumber, bool isReturn) -> void +RabbitMQ.Client.Exceptions.PublishException.PublishSequenceNumber.get -> ulong +RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException() -> void +RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(string! exchange, string! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IConnection.CreateChannelAsync(RabbitMQ.Client.CreateChannelOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.CreateChannelOptions.Default.get -> RabbitMQ.Client.CreateChannelOptions! +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask \ No newline at end of file diff --git a/projects/Test/Applications/CreateChannel/Program.cs b/projects/Test/Applications/CreateChannel/Program.cs index 378eb4f3c8..69d96e2570 100644 --- a/projects/Test/Applications/CreateChannel/Program.cs +++ b/projects/Test/Applications/CreateChannel/Program.cs @@ -1,4 +1,35 @@ -using System; +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -36,7 +67,7 @@ public static async Task Main() for (int j = 0; j < channels.Length; j++) { - channels[j].Dispose(); + await channels[j].DisposeAsync(); } } diff --git a/projects/Test/Applications/GH-1647/Program.cs b/projects/Test/Applications/GH-1647/Program.cs index c110641de4..18ac9e57df 100644 --- a/projects/Test/Applications/GH-1647/Program.cs +++ b/projects/Test/Applications/GH-1647/Program.cs @@ -1,4 +1,35 @@ -#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task using System.Text; using RabbitMQ.Client; @@ -9,6 +40,12 @@ Password = "guest" }; +var channelOptions = new CreateChannelOptions +{ + PublisherConfirmationsEnabled = true, + PublisherConfirmationTrackingEnabled = true +}; + var props = new BasicProperties(); byte[] msg = Encoding.UTF8.GetBytes("test"); await using var connection = await connectionFactory.CreateConnectionAsync(); @@ -16,11 +53,18 @@ { try { - await using var channel = await connection.CreateChannelAsync(); // New channel for each message + await using var channel = await connection.CreateChannelAsync(channelOptions); // New channel for each message await Task.Delay(1000); - await channel.BasicPublishAsync(exchange: string.Empty, routingKey: string.Empty, - mandatory: false, basicProperties: props, body: msg); - Console.WriteLine($"Sent message {i}"); + try + { + await channel.BasicPublishAsync(exchange: string.Empty, routingKey: string.Empty, + mandatory: false, basicProperties: props, body: msg); + Console.WriteLine($"Sent message {i}"); + } + catch (Exception ex) + { + Console.Error.WriteLine($"[ERROR] message {i} not acked: {ex}"); + } } catch (Exception ex) { diff --git a/projects/Test/Applications/MassPublish/Program.cs b/projects/Test/Applications/MassPublish/Program.cs index 2d6ef5e63e..3e074e3ffc 100644 --- a/projects/Test/Applications/MassPublish/Program.cs +++ b/projects/Test/Applications/MassPublish/Program.cs @@ -137,20 +137,24 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer publishTasks.Add(Task.Run(async () => { - using IChannel publishChannel = await publishConnection.CreateChannelAsync(); + using IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync; - await publishChannel.ConfirmSelectAsync(); - for (int i = 0; i < ItemsPerBatch; i++) { - await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: RoutingKey, - basicProperties: s_properties, body: s_payload, mandatory: true); - Interlocked.Increment(ref s_messagesSent); + try + { + await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: RoutingKey, + basicProperties: s_properties, body: s_payload, mandatory: true); + Interlocked.Increment(ref s_messagesSent); + } + catch (Exception ex) + { + Console.Error.WriteLine("[ERROR] channel {0} saw nack, ex: {1}", + publishChannel.ChannelNumber, ex); + } } - await publishChannel.WaitForConfirmsOrDieAsync(); - if (s_debug) { Console.WriteLine("[DEBUG] channel {0} done publishing and waiting for confirms", publishChannel.ChannelNumber); diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs index 16b597459d..d2439f2561 100644 --- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs +++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs @@ -1,4 +1,36 @@ -using System; +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Buffers.Binary; using System.Collections.Generic; using System.Diagnostics; using System.Text; @@ -9,6 +41,8 @@ const int MESSAGE_COUNT = 50_000; bool debug = false; +#pragma warning disable CS8321 // Local function is declared but never used + await PublishMessagesIndividuallyAsync(); await PublishMessagesInBatchAsync(); await HandlePublishConfirmsAsynchronously(); @@ -21,15 +55,14 @@ static Task CreateConnectionAsync() static async Task PublishMessagesIndividuallyAsync() { - Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages individually and handling confirms all at once"); + Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms per-message"); await using IConnection connection = await CreateConnectionAsync(); - await using IChannel channel = await connection.CreateChannelAsync(); + await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); string queueName = queueDeclareResult.QueueName; - await channel.ConfirmSelectAsync(); var sw = new Stopwatch(); sw.Start(); @@ -37,11 +70,16 @@ static async Task PublishMessagesIndividuallyAsync() for (int i = 0; i < MESSAGE_COUNT; i++) { byte[] body = Encoding.UTF8.GetBytes(i.ToString()); - await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body); + try + { + await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body); + } + catch (Exception ex) + { + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: {ex}"); + } } - await channel.WaitForConfirmsOrDieAsync(); - sw.Stop(); Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages individually in {sw.ElapsedMilliseconds:N0} ms"); @@ -52,41 +90,58 @@ static async Task PublishMessagesInBatchAsync() Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches"); await using IConnection connection = await CreateConnectionAsync(); - await using IChannel channel = await connection.CreateChannelAsync(); + await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); string queueName = queueDeclareResult.QueueName; - await channel.ConfirmSelectAsync(); - int batchSize = 100; + int batchSize = 1000; int outstandingMessageCount = 0; var sw = new Stopwatch(); sw.Start(); - var publishTasks = new List(); + var publishTasks = new List(); for (int i = 0; i < MESSAGE_COUNT; i++) { byte[] body = Encoding.UTF8.GetBytes(i.ToString()); - publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body).AsTask()); + publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body)); outstandingMessageCount++; if (outstandingMessageCount == batchSize) { - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); - await Task.WhenAll(publishTasks).WaitAsync(cts.Token); + foreach (ValueTask pt in publishTasks) + { + try + { + await pt; + } + catch (Exception ex) + { + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'"); + } + } publishTasks.Clear(); - - await channel.WaitForConfirmsOrDieAsync(cts.Token); outstandingMessageCount = 0; } } - if (outstandingMessageCount > 0) + if (publishTasks.Count > 0) { - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); - await channel.WaitForConfirmsOrDieAsync(cts.Token); + foreach (ValueTask pt in publishTasks) + { + try + { + await pt; + } + catch (Exception ex) + { + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'"); + } + } + publishTasks.Clear(); + outstandingMessageCount = 0; } sw.Stop(); @@ -98,20 +153,22 @@ async Task HandlePublishConfirmsAsynchronously() Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously"); await using IConnection connection = await CreateConnectionAsync(); - await using IChannel channel = await connection.CreateChannelAsync(); + + var channelOptions = new CreateChannelOptions + { + PublisherConfirmationsEnabled = true, + PublisherConfirmationTrackingEnabled = false + }; + await using IChannel channel = await connection.CreateChannelAsync(channelOptions); // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); string queueName = queueDeclareResult.QueueName; - // NOTE: setting trackConfirmations to false because this program - // is tracking them itself. - await channel.ConfirmSelectAsync(trackConfirmations: false); - - bool publishingCompleted = false; var allMessagesConfirmedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var outstandingConfirms = new LinkedList(); var semaphore = new SemaphoreSlim(1, 1); + int confirmedCount = 0; async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) { if (debug) @@ -140,10 +197,13 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) { break; } + + confirmedCount++; } while (true); } else { + confirmedCount++; outstandingConfirms.Remove(deliveryTag); } } @@ -152,12 +212,30 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) semaphore.Release(); } - if (publishingCompleted && outstandingConfirms.Count == 0) + if (outstandingConfirms.Count == 0 || confirmedCount == MESSAGE_COUNT) { allMessagesConfirmedTcs.SetResult(true); } } + channel.BasicReturnAsync += (sender, ea) => + { + ulong sequenceNumber = 0; + + IReadOnlyBasicProperties props = ea.BasicProperties; + if (props.Headers is not null) + { + object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader]; + if (maybeSeqNum is not null) + { + sequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum); + } + } + + Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number {sequenceNumber} has been basic.return-ed"); + return CleanOutstandingConfirms(sequenceNumber, false); + }; + channel.BasicAcksAsync += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple); channel.BasicNacksAsync += (sender, ea) => { @@ -168,7 +246,7 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) var sw = new Stopwatch(); sw.Start(); - var publishTasks = new List(); + var publishTasks = new List>(); for (int i = 0; i < MESSAGE_COUNT; i++) { string msg = i.ToString(); @@ -187,12 +265,31 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) { semaphore.Release(); } - publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body).AsTask()); + + string rk = queueName; + if (i % 1000 == 0) + { + // This will cause a basic.return, for fun + rk = Guid.NewGuid().ToString(); + } + (ulong, ValueTask) data = + (nextPublishSeqNo, channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true)); + publishTasks.Add(data); } using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await Task.WhenAll(publishTasks).WaitAsync(cts.Token); - publishingCompleted = true; + // await Task.WhenAll(publishTasks).WaitAsync(cts.Token); + foreach ((ulong SeqNo, ValueTask PublishTask) datum in publishTasks) + { + try + { + await datum.PublishTask; + } + catch (Exception ex) + { + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack, seqNo: '{datum.SeqNo}', ex: '{ex}'"); + } + } try { diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj index fd865eccf4..9e089be82f 100644 --- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj +++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj @@ -13,7 +13,7 @@ - + diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index c6d6265d3f..70fa16adbc 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -153,7 +153,7 @@ public virtual async Task InitializeAsync() if (_openChannel) { - _channel = await _conn.CreateChannelAsync(); + _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); } if (IsVerbose) @@ -190,8 +190,14 @@ public virtual async Task DisposeAsync() finally { _eventListener?.Dispose(); - _channel?.Dispose(); - _conn?.Dispose(); + if (_channel is not null) + { + await _channel.DisposeAsync(); + } + if (_conn is not null) + { + await _conn.DisposeAsync(); + } _channel = null; _conn = null; } diff --git a/projects/Test/Common/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs index 8b2e7edc30..bbd65519f2 100644 --- a/projects/Test/Common/TestConnectionRecoveryBase.cs +++ b/projects/Test/Common/TestConnectionRecoveryBase.cs @@ -71,17 +71,14 @@ protected async Task AssertConsumerCountAsync(IChannel ch, string q, uint count) Assert.Equal(count, ok.ConsumerCount); } - protected async Task AssertExchangeRecoveryAsync(IChannel m, string x) + protected async Task AssertExchangeRecoveryAsync(IChannel ch, string x) { - await m.ConfirmSelectAsync(); - await WithTemporaryNonExclusiveQueueAsync(m, async (_, q) => + await WithTemporaryNonExclusiveQueueAsync(ch, async (_, q) => { string rk = "routing-key"; - await m.QueueBindAsync(q, x, rk); - await m.BasicPublishAsync(x, rk, _messageBody); - - Assert.True(await WaitForConfirmsWithCancellationAsync(m)); - await m.ExchangeDeclarePassiveAsync(x); + await ch.QueueBindAsync(q, x, rk); + await ch.BasicPublishAsync(x, rk, _messageBody); + await ch.ExchangeDeclarePassiveAsync(x); }); } @@ -92,15 +89,13 @@ protected Task AssertExclusiveQueueRecoveryAsync(IChannel m, string q) protected async Task AssertQueueRecoveryAsync(IChannel ch, string q, bool exclusive, IDictionary arguments = null) { - await ch.ConfirmSelectAsync(); - await ch.QueueDeclareAsync(queue: q, passive: true, durable: false, exclusive: false, autoDelete: false, arguments: null); + await ch.QueueDeclarePassiveAsync(q); RabbitMQ.Client.QueueDeclareOk ok1 = await ch.QueueDeclareAsync(queue: q, passive: false, durable: false, exclusive: exclusive, autoDelete: false, arguments: arguments); Assert.Equal(0u, ok1.MessageCount); await ch.BasicPublishAsync("", q, _messageBody); - Assert.True(await WaitForConfirmsWithCancellationAsync(ch)); RabbitMQ.Client.QueueDeclareOk ok2 = await ch.QueueDeclareAsync(queue: q, passive: false, durable: false, exclusive: exclusive, autoDelete: false, arguments: arguments); @@ -204,10 +199,8 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName) { using (AutorecoveringConnection publishingConn = await CreateAutorecoveringConnectionAsync()) { - using (IChannel publishingChannel = await publishingConn.CreateChannelAsync()) + using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) { - await publishingChannel.ConfirmSelectAsync(); - for (ushort i = 0; i < TotalMessageCount; i++) { if (i == CloseAtCount) @@ -216,7 +209,6 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName) } await publishingChannel.BasicPublishAsync(string.Empty, queueName, _messageBody); - await publishingChannel.WaitForConfirmsOrDieAsync(); } await publishingChannel.CloseAsync(); @@ -238,14 +230,6 @@ protected static TaskCompletionSource PrepareForShutdown(IConnection conn) return tcs; } - protected static Task WaitForConfirmsWithCancellationAsync(IChannel channel) - { - using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4))) - { - return channel.WaitForConfirmsAsync(cts.Token); - } - } - protected Task WaitForShutdownAsync() { TaskCompletionSource tcs = PrepareForShutdown(_conn); @@ -358,10 +342,8 @@ public virtual Task PostHandleDeliveryAsync(ulong deliveryTag, protected static async Task SendAndConsumeMessageAsync(IConnection conn, string queue, string exchange, string routingKey) { - using (IChannel ch = await conn.CreateChannelAsync()) + using (IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) { - await ch.ConfirmSelectAsync(); - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumer = new AckingBasicConsumer(ch, 1, tcs); @@ -371,8 +353,6 @@ protected static async Task SendAndConsumeMessageAsync(IConnection conn, s await ch.BasicPublishAsync(exchange: exchange, routingKey: routingKey, body: _encoding.GetBytes("test message"), mandatory: true); - await ch.WaitForConfirmsOrDieAsync(); - try { await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); diff --git a/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs b/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs index c3da0f2913..b45c7271e8 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs @@ -154,7 +154,6 @@ public async Task TestBasicAckAfterBasicGetAndChannelRecovery() [Fact] public async Task TestBasicAckEventHandlerRecovery() { - await _channel.ConfirmSelectAsync(); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); ((AutorecoveringChannel)_channel).BasicAcksAsync += (m, args) => { diff --git a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs index 266144a04a..3fcb978b26 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs @@ -75,7 +75,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName, await _channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: routingKey); await _channel.CloseAsync(); - _channel.Dispose(); + await _channel.DisposeAsync(); _channel = null; _channel = await _conn.CreateChannelAsync(); @@ -96,7 +96,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName, consumer.ReceivedAsync += MessageReceived; await _channel.BasicConsumeAsync(queueName, true, consumer); - await using (IChannel pubCh = await _conn.CreateChannelAsync()) + await using (IChannel pubCh = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) { await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body); await pubCh.CloseAsync(); @@ -106,7 +106,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName, await CloseAndWaitForRecoveryAsync(); - await using (IChannel pubCh = await _conn.CreateChannelAsync()) + await using (IChannel pubCh = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) { await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: "unused", body: body); await pubCh.CloseAsync(); diff --git a/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs index 0cdfe8e174..a2aac17c35 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs @@ -55,8 +55,6 @@ public async Task TestExchangeRecoveryTest() [Fact] public async Task TestExchangeToExchangeBindingRecovery() { - await _channel.ConfirmSelectAsync(); - string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName; string ex_source = GenerateExchangeName(); @@ -73,7 +71,6 @@ public async Task TestExchangeToExchangeBindingRecovery() await CloseAndWaitForRecoveryAsync(); Assert.True(_channel.IsOpen); await _channel.BasicPublishAsync(ex_source, "", body: _encoding.GetBytes("msg"), mandatory: true); - await _channel.WaitForConfirmsOrDieAsync(); await AssertMessageCountAsync(q, 1); } finally diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index f6b17f3ba6..773238ff69 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -213,7 +213,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() }); return Task.CompletedTask; }; - await using (IChannel publishChannel = await publishConn.CreateChannelAsync()) + await using (IChannel publishChannel = await publishConn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) { AddCallbackExceptionHandlers(publishConn, publishChannel); publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel, @@ -226,15 +226,15 @@ public async Task TestBasicRoundtripConcurrentManyMessages() }); return Task.CompletedTask; }; - await publishChannel.ConfirmSelectAsync(); + var publishTasks = new List(); for (int i = 0; i < publish_total; i++) { - await publishChannel.BasicPublishAsync(string.Empty, queueName, body1); - await publishChannel.BasicPublishAsync(string.Empty, queueName, body2); - await publishChannel.WaitForConfirmsOrDieAsync(); + publishTasks.Add(publishChannel.BasicPublishAsync(string.Empty, queueName, body1).AsTask()); + publishTasks.Add(publishChannel.BasicPublishAsync(string.Empty, queueName, body2).AsTask()); } + await Task.WhenAll(publishTasks).WaitAsync(WaitSpan); await publishChannel.CloseAsync(); } @@ -463,8 +463,6 @@ public async Task TestBasicAckAsync() return Task.CompletedTask; }; - await _channel.ConfirmSelectAsync(); - var consumer = new AsyncEventingBasicConsumer(_channel); consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs args) => { @@ -491,7 +489,6 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, { byte[] _body = _encoding.GetBytes(Guid.NewGuid().ToString()); await _channel.BasicPublishAsync(string.Empty, queueName, _body); - await _channel.WaitForConfirmsOrDieAsync(); } return true; @@ -649,12 +646,10 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() var consumer1 = new AsyncEventingBasicConsumer(_channel); consumer1.ReceivedAsync += async (sender, args) => { - await using IChannel innerChannel = await _conn.CreateChannelAsync(); - await innerChannel.ConfirmSelectAsync(); + await using IChannel innerChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); await innerChannel.BasicPublishAsync(exchangeName, queue2Name, mandatory: true, body: Encoding.ASCII.GetBytes(nameof(TestCreateChannelWithinAsyncConsumerCallback_GH650))); - await innerChannel.WaitForConfirmsOrDieAsync(); await innerChannel.CloseAsync(); }; await _channel.BasicConsumeAsync(queue1Name, autoAck: true, consumer1); @@ -667,9 +662,7 @@ await innerChannel.BasicPublishAsync(exchangeName, queue2Name, }; await _channel.BasicConsumeAsync(queue2Name, autoAck: true, consumer2); - await _channel.ConfirmSelectAsync(); await _channel.BasicPublishAsync(exchangeName, queue1Name, body: GetRandomBody(1024)); - await _channel.WaitForConfirmsOrDieAsync(); Assert.True(await tcs.Task); } @@ -689,9 +682,9 @@ public async Task TestCloseWithinEventHandler_GH1567() { await _channel.BasicCancelAsync(eventArgs.ConsumerTag); #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - _channel.CloseAsync().ContinueWith((_) => + _channel.CloseAsync().ContinueWith(async (_) => { - _channel.Dispose(); + await _channel.DisposeAsync(); _channel = null; }); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed @@ -715,7 +708,7 @@ private async Task ValidateConsumerDispatchConcurrency() Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency); Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency); await using IChannel ch = await _conn.CreateChannelAsync( - consumerDispatchConcurrency: expectedConsumerDispatchConcurrency); + new CreateChannelOptions { ConsumerDispatchConcurrency = expectedConsumerDispatchConcurrency }); AutorecoveringChannel ach = (AutorecoveringChannel)ch; Assert.Equal(expectedConsumerDispatchConcurrency, ach.ConsumerDispatcher.Concurrency); } diff --git a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs index 5cc8648884..cf3c59d316 100644 --- a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs +++ b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs @@ -105,7 +105,7 @@ public async Task TestAsyncEventingBasicConsumer_GH1038() await _channel.BasicConsumeAsync(queueName, false, consumer); //publisher - await using IChannel publisherChannel = await _conn.CreateChannelAsync(); + await using IChannel publisherChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!"); var props = new BasicProperties(); await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty, diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index 3cbf98ca1b..107460fd86 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -35,6 +35,7 @@ using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; using Xunit; using Xunit.Abstractions; using Xunit.Sdk; @@ -59,7 +60,7 @@ public override Task InitializeAsync() public async Task TestBasicRoundtripArray() { _conn = await _connFactory.CreateConnectionAsync(); - _channel = await _conn.CreateChannelAsync(); + _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); QueueDeclareOk q = await _channel.QueueDeclareAsync(); var bp = new BasicProperties(); @@ -87,7 +88,7 @@ public async Task TestBasicRoundtripArray() public async Task TestBasicRoundtripCachedString() { _conn = await _connFactory.CreateConnectionAsync(); - _channel = await _conn.CreateChannelAsync(); + _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); CachedString exchangeName = new CachedString(string.Empty); CachedString queueName = new CachedString((await _channel.QueueDeclareAsync()).QueueName); @@ -115,7 +116,7 @@ public async Task TestBasicRoundtripCachedString() public async Task TestBasicRoundtripReadOnlyMemory() { _conn = await _connFactory.CreateConnectionAsync(); - _channel = await _conn.CreateChannelAsync(); + _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); QueueDeclareOk q = await _channel.QueueDeclareAsync(); byte[] sendBody = _encoding.GetBytes("hi"); @@ -142,7 +143,7 @@ public async Task TestBasicRoundtripReadOnlyMemory() public async Task CanNotModifyPayloadAfterPublish() { _conn = await _connFactory.CreateConnectionAsync(); - _channel = await _conn.CreateChannelAsync(); + _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); QueueDeclareOk q = await _channel.QueueDeclareAsync(); byte[] sendBody = new byte[1000]; @@ -203,7 +204,7 @@ public async Task TestMaxInboundMessageBodySize() Assert.Equal(maxMsgSize, cf.Endpoint.MaxInboundMessageBodySize); Assert.Equal(maxMsgSize, conn.Endpoint.MaxInboundMessageBodySize); - await using (IChannel channel = await conn.CreateChannelAsync()) + await using (IChannel channel = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) { channel.ChannelShutdownAsync += (o, a) => { @@ -247,7 +248,9 @@ public async Task TestMaxInboundMessageBodySize() string tag = await channel.BasicConsumeAsync(q.QueueName, true, consumer); await channel.BasicPublishAsync("", q.QueueName, msg0); - await channel.BasicPublishAsync("", q.QueueName, msg1); + AlreadyClosedException ex = await Assert.ThrowsAsync(() => + channel.BasicPublishAsync("", q.QueueName, msg1).AsTask()); + Assert.IsType(ex.InnerException); Assert.True(await tcs.Task); Assert.Equal(1, count); @@ -259,6 +262,7 @@ public async Task TestMaxInboundMessageBodySize() try { await channel.CloseAsync(); + await channel.DisposeAsync(); } catch (Exception chex) { @@ -272,6 +276,7 @@ public async Task TestMaxInboundMessageBodySize() try { await conn.CloseAsync(); + await conn.DisposeAsync(); } catch (Exception connex) { @@ -286,7 +291,7 @@ public async Task TestMaxInboundMessageBodySize() public async Task TestPropertiesRoundtrip_Headers() { _conn = await _connFactory.CreateConnectionAsync(); - _channel = await _conn.CreateChannelAsync(); + _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); var subject = new BasicProperties { diff --git a/projects/Test/Integration/TestBasicPublishAsync.cs b/projects/Test/Integration/TestBasicPublishAsync.cs index ad4650e2a5..072e2a6323 100644 --- a/projects/Test/Integration/TestBasicPublishAsync.cs +++ b/projects/Test/Integration/TestBasicPublishAsync.cs @@ -49,7 +49,6 @@ public async Task TestQueuePurgeAsync() var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - await _channel.ConfirmSelectAsync(); QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true); @@ -60,7 +59,6 @@ public async Task TestQueuePurgeAsync() { await _channel.BasicPublishAsync(string.Empty, q, body); } - await _channel.WaitForConfirmsOrDieAsync(); publishSyncSource.SetResult(true); }); diff --git a/projects/Test/Integration/TestChannelAllocation.cs b/projects/Test/Integration/TestChannelAllocation.cs index 42b495c491..ddd43b4872 100644 --- a/projects/Test/Integration/TestChannelAllocation.cs +++ b/projects/Test/Integration/TestChannelAllocation.cs @@ -73,7 +73,7 @@ public async Task AllocateInOrder() foreach (IChannel channel in channels) { await channel.CloseAsync(); - channel.Dispose(); + await channel.DisposeAsync(); } } @@ -90,7 +90,7 @@ public async Task AllocateInOrderOnlyUsingDispose() foreach (IChannel channel in channels) { - channel.Dispose(); + await channel.DisposeAsync(); } } @@ -100,12 +100,12 @@ public async Task AllocateAfterFreeingLast() IChannel ch0 = await _c.CreateChannelAsync(); Assert.Equal(1, ChannelNumber(ch0)); await ch0.CloseAsync(); - ch0.Dispose(); + await ch0.DisposeAsync(); IChannel ch1 = await _c.CreateChannelAsync(); Assert.Equal(1, ChannelNumber(ch1)); await ch1.CloseAsync(); - ch1.Dispose(); + await ch1.DisposeAsync(); } [Fact] @@ -121,6 +121,7 @@ public async Task AllocateAfterFreeingMany() foreach (IChannel channel in channels) { await channel.CloseAsync(); + await channel.DisposeAsync(); } channels.Clear(); @@ -139,6 +140,7 @@ public async Task AllocateAfterFreeingMany() { Assert.Equal(k++, ChannelNumber(channel)); await channel.CloseAsync(); + await channel.DisposeAsync(); } } diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs index 26f83eb347..71fd7bfaf3 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs @@ -37,6 +37,7 @@ using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; using Xunit; using Xunit.Abstractions; @@ -54,25 +55,21 @@ public TestConcurrentAccessWithSharedChannel(ITestOutputHelper output) [Fact] public async Task ConcurrentPublishSingleChannel() { - int publishAckCount = 0; + int expectedTotalMessageCount = 0; + int expectedTotalReturnCount = 0; + int totalNackCount = 0; + int totalReturnCount = 0; + int totalReceivedCount = 0; _channel.BasicAcksAsync += (object sender, BasicAckEventArgs e) => { - Interlocked.Increment(ref publishAckCount); return Task.CompletedTask; }; - _channel.BasicNacksAsync += (object sender, BasicNackEventArgs e) => - { - _output.WriteLine($"channel #{_channel.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}"); - return Task.CompletedTask; - }; - - await _channel.ConfirmSelectAsync(trackConfirmations: false); - await TestConcurrentOperationsAsync(async () => { - long receivedCount = 0; + long thisBatchReceivedCount = 0; + long thisBatchReturnedCount = 0; var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var msgTracker = new ConcurrentDictionary(); @@ -84,6 +81,7 @@ await TestConcurrentOperationsAsync(async () => consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs ea) => { + Interlocked.Increment(ref totalReceivedCount); try { System.Diagnostics.Debug.Assert(object.ReferenceEquals(sender, consumer)); @@ -95,7 +93,9 @@ await TestConcurrentOperationsAsync(async () => IChannel ch = cons.Channel; await ch.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false); - if (Interlocked.Increment(ref receivedCount) == _messageCount) + long receivedCountSoFar = Interlocked.Increment(ref thisBatchReceivedCount); + + if ((receivedCountSoFar + thisBatchReturnedCount) == _messageCount) { if (msgTracker.Values.Any(v => v == false)) { @@ -118,18 +118,59 @@ await TestConcurrentOperationsAsync(async () => var publishTasks = new List(); for (ushort i = 0; i < _messageCount; i++) { - msgTracker[i] = false; + Interlocked.Increment(ref expectedTotalMessageCount); + byte[] body = _encoding.GetBytes(i.ToString()); - publishTasks.Add(_channel.BasicPublishAsync("", q.QueueName, mandatory: true, body: body)); + + string routingKey = q.QueueName; + if (i % 5 == 0) + { + routingKey = Guid.NewGuid().ToString(); + Interlocked.Increment(ref thisBatchReturnedCount); + Interlocked.Increment(ref expectedTotalReturnCount); + } + else + { + msgTracker[i] = false; + } + + publishTasks.Add(_channel.BasicPublishAsync("", routingKey, mandatory: true, body: body)); } foreach (ValueTask pt in publishTasks) { - await pt; + try + { + await pt; + } + catch (PublishException ex) + { + if (ex.IsReturn) + { + Interlocked.Increment(ref totalReturnCount); + } + else + { + Interlocked.Increment(ref totalNackCount); + } + } } Assert.True(await tcs.Task); }, Iterations); + + if (IsVerbose) + { + _output.WriteLine("expectedTotalMessageCount: {0}", expectedTotalMessageCount); + _output.WriteLine("expectedTotalReturnCount: {0}", expectedTotalReturnCount); + _output.WriteLine("totalReceivedCount: {0}", totalReceivedCount); + _output.WriteLine("totalReturnCount: {0}", totalReturnCount); + _output.WriteLine("totalNackCount: {0}", totalNackCount); + } + + Assert.Equal(expectedTotalReturnCount, totalReturnCount); + Assert.Equal(expectedTotalMessageCount, (totalReceivedCount + totalReturnCount)); + Assert.Equal(0, totalNackCount); } } } diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs index bea6920943..b1febe2a25 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs @@ -108,7 +108,7 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in try { - await using IChannel ch = await _conn.CreateChannelAsync(); + await using IChannel ch = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true }); ch.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(ch, ea, (args) => @@ -121,7 +121,6 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in return Task.CompletedTask; }; - await ch.ConfirmSelectAsync(trackConfirmations: false); ch.BasicAcksAsync += (object sender, BasicAckEventArgs e) => { diff --git a/projects/Test/Integration/TestConfirmSelect.cs b/projects/Test/Integration/TestConfirmSelect.cs index d2b9a38a59..3db7e37a1f 100644 --- a/projects/Test/Integration/TestConfirmSelect.cs +++ b/projects/Test/Integration/TestConfirmSelect.cs @@ -52,14 +52,12 @@ ValueTask PublishAsync() routingKey: Guid.NewGuid().ToString(), _encoding.GetBytes("message")); } - await _channel.ConfirmSelectAsync(); Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync()); await PublishAsync(); Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync()); await PublishAsync(); Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync()); - await _channel.ConfirmSelectAsync(); await PublishAsync(); Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync()); await PublishAsync(); @@ -77,13 +75,11 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) await _channel.ExchangeDeclareAsync("sample", "fanout", autoDelete: true); // _channel.BasicAcks += (s, e) => _output.WriteLine("Acked {0}", e.DeliveryTag); - await _channel.ConfirmSelectAsync(); var properties = new BasicProperties(); // _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync()); await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, mandatory: false, basicProperties: properties, body: body); - await _channel.WaitForConfirmsOrDieAsync(); try { @@ -93,7 +89,6 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, }; // _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync()); await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body); - await _channel.WaitForConfirmsOrDieAsync(); } catch { @@ -103,7 +98,6 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, properties = new BasicProperties(); // _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync()); await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body); - await _channel.WaitForConfirmsOrDieAsync(); // _output.WriteLine("I'm done..."); } } diff --git a/projects/Test/Integration/TestConfirmSelectAsync.cs b/projects/Test/Integration/TestConfirmSelectAsync.cs index 0681ef05a7..0502185e2d 100644 --- a/projects/Test/Integration/TestConfirmSelectAsync.cs +++ b/projects/Test/Integration/TestConfirmSelectAsync.cs @@ -48,23 +48,21 @@ public TestConfirmSelectAsync(ITestOutputHelper output) : base(output) [Fact] public async Task TestConfirmSelectIdempotency() { - await _channel.ConfirmSelectAsync(); Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync()); - await Publish(); + await PublishAsync(); Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync()); - await Publish(); + await PublishAsync(); Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync()); - await _channel.ConfirmSelectAsync(); - await Publish(); + await PublishAsync(); Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync()); - await Publish(); + await PublishAsync(); Assert.Equal(5ul, await _channel.GetNextPublishSequenceNumberAsync()); - await Publish(); + await PublishAsync(); Assert.Equal(6ul, await _channel.GetNextPublishSequenceNumberAsync()); } - private ValueTask Publish() + private ValueTask PublishAsync() { return _channel.BasicPublishAsync(exchange: "", routingKey: Guid.NewGuid().ToString(), _message); diff --git a/projects/Test/Integration/TestConnectionFactory.cs b/projects/Test/Integration/TestConnectionFactory.cs index 46deccdbee..dd5a5cd236 100644 --- a/projects/Test/Integration/TestConnectionFactory.cs +++ b/projects/Test/Integration/TestConnectionFactory.cs @@ -241,7 +241,7 @@ public async Task TestCreateConnectionWithoutAutoRecoverySelectsAHostFromTheList IConnection conn = await cf.CreateConnectionAsync(new List { "localhost" }); await conn.CloseAsync(); - conn.Dispose(); + await conn.DisposeAsync(); Assert.Equal("not_localhost", cf.HostName); Assert.Equal("localhost", conn.Endpoint.HostName); } diff --git a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs index 8addefa191..ed5bc466c4 100644 --- a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs +++ b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs @@ -289,10 +289,8 @@ public async Task TestTopologyRecoveryConsumerFilter() return Task.CompletedTask; }; - await using (IChannel ch = await conn.CreateChannelAsync()) + await using (IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) { - await ch.ConfirmSelectAsync(); - await ch.ExchangeDeclareAsync(exchange, "direct"); await ch.QueueDeclareAsync(queueWithRecoveredConsumer, false, false, false); await ch.QueueDeclareAsync(queueWithIgnoredConsumer, false, false, false); @@ -327,7 +325,6 @@ public async Task TestTopologyRecoveryConsumerFilter() Assert.True(ch.IsOpen); await ch.BasicPublishAsync(exchange, binding1, _encoding.GetBytes("test message")); await ch.BasicPublishAsync(exchange, binding2, _encoding.GetBytes("test message")); - await WaitForConfirmsWithCancellationAsync(ch); await consumerRecoveryTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.True(await consumerRecoveryTcs.Task); diff --git a/projects/Test/Integration/TestConnectionShutdown.cs b/projects/Test/Integration/TestConnectionShutdown.cs index daa26af1a8..78abffd0ab 100644 --- a/projects/Test/Integration/TestConnectionShutdown.cs +++ b/projects/Test/Integration/TestConnectionShutdown.cs @@ -107,7 +107,7 @@ public async Task TestDisposedWithSocketClosedOutOfBand() try { - _conn.Dispose(); + await _conn.DisposeAsync(); await WaitAsync(tcs, WaitSpan, "channel shutdown"); await frameHandlerCloseTask.AsTask().WaitAsync(WaitSpan); } @@ -145,7 +145,7 @@ public async Task TestShutdownSignalPropagationToChannelsUsingDispose() return Task.CompletedTask; }; - _conn.Dispose(); + await _conn.DisposeAsync(); _conn = null; await WaitAsync(tcs, TimeSpan.FromSeconds(3), "channel shutdown"); @@ -172,7 +172,7 @@ public async Task TestConsumerDispatcherShutdown() public async Task TestDisposeAfterAbort_GH825() { await _channel.AbortAsync(); - _channel.Dispose(); + await _channel.DisposeAsync(); } } } diff --git a/projects/Test/Integration/TestConnectionTopologyRecovery.cs b/projects/Test/Integration/TestConnectionTopologyRecovery.cs index ffe7d7847c..c05032190f 100644 --- a/projects/Test/Integration/TestConnectionTopologyRecovery.cs +++ b/projects/Test/Integration/TestConnectionTopologyRecovery.cs @@ -104,7 +104,7 @@ public async Task TestTopologyRecoveryQueueFilter() tcs.SetResult(true); return Task.CompletedTask; }; - IChannel ch = await conn.CreateChannelAsync(); + IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); await ch.QueueDeclareAsync(queueToRecover, false, false, false); await ch.QueueDeclareAsync(queueToIgnore, false, false, false); @@ -131,8 +131,8 @@ public async Task TestTopologyRecoveryQueueFilter() { await ch.CloseAsync(); await conn.CloseAsync(); - ch.Dispose(); - conn.Dispose(); + await ch.DisposeAsync(); + await conn.DisposeAsync(); } } @@ -155,7 +155,7 @@ public async Task TestTopologyRecoveryExchangeFilter() tcs.SetResult(true); return Task.CompletedTask; }; - IChannel ch = await conn.CreateChannelAsync(); + IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); try { await ch.ExchangeDeclareAsync(exchangeToRecover, "topic", false, true); @@ -180,8 +180,8 @@ public async Task TestTopologyRecoveryExchangeFilter() { await ch.CloseAsync(); await conn.CloseAsync(); - ch.Dispose(); - conn.Dispose(); + await ch.DisposeAsync(); + await conn.DisposeAsync(); } } @@ -228,7 +228,9 @@ public async Task TestTopologyRecoveryBindingFilter() Assert.True(ch.IsOpen); Assert.True(await SendAndConsumeMessageAsync(_conn, queueWithRecoveredBinding, exchange, bindingToRecover)); - Assert.False(await SendAndConsumeMessageAsync(_conn, queueWithIgnoredBinding, exchange, bindingToIgnore)); + PublishException ex = await Assert.ThrowsAnyAsync(() => + SendAndConsumeMessageAsync(_conn, queueWithIgnoredBinding, exchange, bindingToIgnore)); + Assert.Equal((ulong)1, ex.PublishSequenceNumber); } finally { @@ -237,8 +239,8 @@ public async Task TestTopologyRecoveryBindingFilter() await ch.QueueDeleteAsync(queueWithIgnoredBinding); await ch.CloseAsync(); await conn.CloseAsync(); - ch.Dispose(); - conn.Dispose(); + await ch.DisposeAsync(); + await conn.DisposeAsync(); } } @@ -270,10 +272,9 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() return Task.CompletedTask; }; - IChannel ch = await conn.CreateChannelAsync(); + IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); try { - await ch.ConfirmSelectAsync(); await ch.ExchangeDeclareAsync(exchange, "direct"); await ch.QueueDeclareAsync(queue1, false, false, false); @@ -315,7 +316,6 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() var pt1 = ch.BasicPublishAsync(exchange, binding1, true, _encoding.GetBytes("test message")); var pt2 = ch.BasicPublishAsync(exchange, binding2, true, _encoding.GetBytes("test message")); - await WaitForConfirmsWithCancellationAsync(ch); await Task.WhenAll(pt1.AsTask(), pt2.AsTask()).WaitAsync(WaitSpan); await Task.WhenAll(consumerReceivedTcs1.Task, consumerReceivedTcs2.Task).WaitAsync(WaitSpan); @@ -326,8 +326,8 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() { await ch.CloseAsync(); await conn.CloseAsync(); - ch.Dispose(); - conn.Dispose(); + await ch.DisposeAsync(); + await conn.DisposeAsync(); } } @@ -367,7 +367,7 @@ await channel.QueueDeclareAsync(rq.Name, false, false, false, tcs.SetResult(true); return Task.CompletedTask; }; - IChannel ch = await conn.CreateChannelAsync(); + IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); await ch.QueueDeclareAsync(queueToRecoverWithException, false, false, false); await ch.QueueDeclareAsync(queueToRecoverSuccessfully, false, false, false); @@ -391,8 +391,8 @@ await _channel.QueueDeclareAsync(queueToRecoverWithException, false, false, fals await _channel.QueueDeleteAsync(queueToRecoverWithException); await ch.CloseAsync(); await conn.CloseAsync(); - ch.Dispose(); - conn.Dispose(); + await ch.DisposeAsync(); + await conn.DisposeAsync(); } } @@ -450,8 +450,8 @@ public async Task TestTopologyRecoveryExchangeExceptionHandler() await ch.CloseAsync(); await conn.CloseAsync(); - ch.Dispose(); - conn.Dispose(); + await ch.DisposeAsync(); + await conn.DisposeAsync(); } } @@ -514,8 +514,8 @@ public async Task TestTopologyRecoveryBindingExceptionHandler() await ch.CloseAsync(); await conn.CloseAsync(); - ch.Dispose(); - conn.Dispose(); + await ch.DisposeAsync(); + await conn.DisposeAsync(); } [Fact] @@ -554,7 +554,8 @@ public async Task TestTopologyRecoveryConsumerExceptionHandler() IChannel ch = await conn.CreateChannelAsync(); try { - await ch.ConfirmSelectAsync(); + // Note: no need to enable publisher confirmations as they are + // automatically enabled for channels await _channel.QueueDeclareAsync(queueWithExceptionConsumer, false, false, false); await _channel.QueuePurgeAsync(queueWithExceptionConsumer); @@ -590,8 +591,8 @@ public async Task TestTopologyRecoveryConsumerExceptionHandler() { await ch.CloseAsync(); await conn.CloseAsync(); - ch.Dispose(); - conn.Dispose(); + await ch.DisposeAsync(); + await conn.DisposeAsync(); } } } diff --git a/projects/Test/Integration/TestExtensions.cs b/projects/Test/Integration/TestExtensions.cs index fd55296e6c..c72f49d4a4 100644 --- a/projects/Test/Integration/TestExtensions.cs +++ b/projects/Test/Integration/TestExtensions.cs @@ -43,28 +43,9 @@ public TestExtensions(ITestOutputHelper output) : base(output) { } - [Fact] - public async Task TestConfirmBarrier() - { - await _channel.ConfirmSelectAsync(); - for (int i = 0; i < 10; i++) - { - await _channel.BasicPublishAsync(string.Empty, string.Empty, Array.Empty()); - } - Assert.True(await _channel.WaitForConfirmsAsync()); - } - - [Fact] - public async Task TestConfirmBeforeWait() - { - await Assert.ThrowsAsync(() => _channel.WaitForConfirmsAsync()); - } - [Fact] public async Task TestExchangeBinding() { - await _channel.ConfirmSelectAsync(); - await _channel.ExchangeDeclareAsync("src", ExchangeType.Direct, false, false); await _channel.ExchangeDeclareAsync("dest", ExchangeType.Direct, false, false); string queue = await _channel.QueueDeclareAsync(string.Empty, false, false, true); @@ -73,12 +54,10 @@ public async Task TestExchangeBinding() await _channel.QueueBindAsync(queue, "dest", string.Empty); await _channel.BasicPublishAsync("src", string.Empty, Array.Empty()); - await _channel.WaitForConfirmsAsync(); Assert.NotNull(await _channel.BasicGetAsync(queue, true)); await _channel.ExchangeUnbindAsync("dest", "src", string.Empty); await _channel.BasicPublishAsync("src", string.Empty, Array.Empty()); - await _channel.WaitForConfirmsAsync(); Assert.Null(await _channel.BasicGetAsync(queue, true)); diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index 836aead2f1..337bd3d496 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -42,7 +43,7 @@ namespace Test.Integration { public class TestFloodPublishing : IntegrationFixture { - private static readonly TimeSpan TenSeconds = TimeSpan.FromSeconds(10); + private static readonly TimeSpan FiveSeconds = TimeSpan.FromSeconds(5); private readonly byte[] _body = GetRandomBody(2048); public TestFloodPublishing(ITestOutputHelper output) : base(output) @@ -64,7 +65,7 @@ public async Task TestUnthrottledFloodPublishing() _connFactory.AutomaticRecoveryEnabled = false; _conn = await _connFactory.CreateConnectionAsync(); Assert.IsNotType(_conn); - _channel = await _conn.CreateChannelAsync(); + _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); _conn.ConnectionShutdownAsync += (_, ea) => { @@ -91,21 +92,30 @@ public async Task TestUnthrottledFloodPublishing() return Task.CompletedTask; }; + var publishTasks = new List(); var stopwatch = Stopwatch.StartNew(); int i = 0; + int publishCount = 0; try { for (i = 0; i < 65535 * 64; i++) { if (i % 65536 == 0) { - if (stopwatch.Elapsed > TenSeconds) + if (stopwatch.Elapsed > FiveSeconds) { break; } } - await _channel.BasicPublishAsync(CachedString.Empty, CachedString.Empty, _body); + publishCount++; + publishTasks.Add(_channel.BasicPublishAsync(CachedString.Empty, CachedString.Empty, _body).AsTask()); + + if (i % 500 == 0) + { + await Task.WhenAll(publishTasks).WaitAsync(ShortSpan); + publishTasks.Clear(); + } } } finally @@ -115,6 +125,7 @@ public async Task TestUnthrottledFloodPublishing() Assert.True(_conn.IsOpen); Assert.False(sawUnexpectedShutdown); + _output.WriteLine("[INFO] published {0} messages in {1}", publishCount, stopwatch.Elapsed); } [Fact] @@ -181,9 +192,8 @@ public async Task TestMultithreadFloodPublishing() return Task.CompletedTask; }; - await using (IChannel publishChannel = await publishConnection.CreateChannelAsync()) + await using (IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true })) { - await publishChannel.ConfirmSelectAsync(); publishChannel.ChannelShutdownAsync += (o, ea) => { @@ -198,12 +208,13 @@ public async Task TestMultithreadFloodPublishing() return Task.CompletedTask; }; + var publishTasks = new List(); for (int i = 0; i < publishCount && false == stop; i++) { - await publishChannel.BasicPublishAsync(string.Empty, queueName, true, sendBody); + publishTasks.Add(publishChannel.BasicPublishAsync(string.Empty, queueName, true, sendBody).AsTask()); } - await publishChannel.WaitForConfirmsOrDieAsync(); + await Task.WhenAll(publishTasks).WaitAsync(ShortSpan); await publishChannel.CloseAsync(); } diff --git a/projects/Test/Integration/TestMessageCount.cs b/projects/Test/Integration/TestMessageCount.cs index beca50f6d2..363328374a 100644 --- a/projects/Test/Integration/TestMessageCount.cs +++ b/projects/Test/Integration/TestMessageCount.cs @@ -45,13 +45,11 @@ public TestMessageCount(ITestOutputHelper output) : base(output) [Fact] public async Task TestMessageCountMethod() { - await _channel.ConfirmSelectAsync(); string q = GenerateQueueName(); await _channel.QueueDeclareAsync(queue: q, passive: false, durable: false, exclusive: true, autoDelete: false, arguments: null); Assert.Equal(0u, await _channel.MessageCountAsync(q)); await _channel.BasicPublishAsync("", q, _encoding.GetBytes("msg")); - await _channel.WaitForConfirmsAsync(); Assert.Equal(1u, await _channel.MessageCountAsync(q)); } } diff --git a/projects/Test/Integration/TestPublisherConfirms.cs b/projects/Test/Integration/TestPublisherConfirms.cs index 80df6ea72b..301e30b0f5 100644 --- a/projects/Test/Integration/TestPublisherConfirms.cs +++ b/projects/Test/Integration/TestPublisherConfirms.cs @@ -29,11 +29,10 @@ // Copyright (c) 2007-2024 Broadcom. All Rights Reserved. //--------------------------------------------------------------------------- -using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; -using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -49,52 +48,8 @@ public TestPublisherConfirms(ITestOutputHelper output) _messageBody = GetRandomBody(4096); } - [Fact] - public Task TestWaitForConfirmsWithoutTimeoutAsync() - { - return TestWaitForConfirmsAsync(200, async (ch) => - { - Assert.True(await ch.WaitForConfirmsAsync()); - }); - } - - [Fact] - public Task TestWaitForConfirmsWithTimeout() - { - return TestWaitForConfirmsAsync(200, async (ch) => - { - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4)); - Assert.True(await ch.WaitForConfirmsAsync(cts.Token)); - }); - } - - [Fact] - public async Task TestWaitForConfirmsWithTimeoutAsync_MightThrowTaskCanceledException() - { - bool waitResult = false; - bool sawException = false; - - Task t = TestWaitForConfirmsAsync(10000, async (ch) => - { - using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(1)); - try - { - waitResult = await ch.WaitForConfirmsAsync(cts.Token); - } - catch - { - sawException = true; - } - }); - - await t; - - if (waitResult == false && sawException == false) - { - Assert.Fail("test failed, both waitResult and sawException are still false"); - } - } - + /* + * TODO figure out how to actually test basic.nack and basic.return [Fact] public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse() { @@ -106,13 +61,13 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout Assert.False(await ch.WaitForConfirmsAsync(cts.Token)); }); } + */ [Fact] public async Task TestWaitForConfirmsWithEventsAsync() { string queueName = GenerateQueueName(); - await using IChannel ch = await _conn.CreateChannelAsync(); - await ch.ConfirmSelectAsync(); + await using IChannel ch = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, exclusive: true, autoDelete: false, arguments: null); @@ -128,12 +83,12 @@ await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, try { + var publishTasks = new List(); for (int i = 0; i < n; i++) { - await ch.BasicPublishAsync("", queueName, _encoding.GetBytes("msg")); + publishTasks.Add(ch.BasicPublishAsync("", queueName, _encoding.GetBytes("msg")).AsTask()); } - - await ch.WaitForConfirmsAsync(); + await Task.WhenAll(publishTasks).WaitAsync(ShortSpan); // Note: number of event invocations is not guaranteed // to be equal to N because acks can be batched, @@ -147,32 +102,5 @@ await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, await ch.CloseAsync(); } } - - private async Task TestWaitForConfirmsAsync(int numberOfMessagesToPublish, Func fn) - { - string queueName = GenerateQueueName(); - await using IChannel ch = await _conn.CreateChannelAsync(); - var props = new BasicProperties { Persistent = true }; - - await ch.ConfirmSelectAsync(); - await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, - exclusive: true, autoDelete: false, arguments: null); - - for (int i = 0; i < numberOfMessagesToPublish; i++) - { - await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, - body: _messageBody, mandatory: true, basicProperties: props); - } - - try - { - await fn(ch); - } - finally - { - await ch.QueueDeleteAsync(queue: queueName, ifUnused: false, ifEmpty: false); - await ch.CloseAsync(); - } - } } } diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index 725d7f7a5e..9c588f889c 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -125,28 +125,26 @@ public async Task TestCloseConnection() async Task PublishLoop() { - await using IChannel ch = await conn.CreateChannelAsync(); - await ch.ConfirmSelectAsync(); + await using IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); QueueDeclareOk q = await ch.QueueDeclareAsync(); while (conn.IsOpen) { - await ch.BasicPublishAsync("", q.QueueName, GetRandomBody()); - messagePublishedTcs.TrySetResult(true); /* - * Note: - * In this test, it is possible that the connection - * will be closed before the ack is returned, - * and this await will throw an exception - */ + * Note: + * In this test, it is possible that the connection + * will be closed before the ack is returned, + * and this await will throw an exception + */ try { - await ch.WaitForConfirmsAsync(); + await ch.BasicPublishAsync("", q.QueueName, GetRandomBody()); + messagePublishedTcs.TrySetResult(true); } catch (AlreadyClosedException ex) { if (IsVerbose) { - _output.WriteLine($"[WARNING] WaitForConfirmsAsync ex: {ex}"); + _output.WriteLine($"[WARNING] BasicPublishAsync ex: {ex}"); } } } @@ -206,13 +204,11 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout() Task pubTask = Task.Run(async () => { await using IConnection conn = await cf.CreateConnectionAsync(); - await using IChannel ch = await conn.CreateChannelAsync(); - await ch.ConfirmSelectAsync(); + await using IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); QueueDeclareOk q = await ch.QueueDeclareAsync(); while (conn.IsOpen) { await ch.BasicPublishAsync("", q.QueueName, GetRandomBody()); - await ch.WaitForConfirmsAsync(); await Task.Delay(TimeSpan.FromSeconds(1)); tcs.TrySetResult(true); } diff --git a/projects/Test/OAuth2/TestOAuth2.cs b/projects/Test/OAuth2/TestOAuth2.cs index c4abf5f947..347ad12ce7 100644 --- a/projects/Test/OAuth2/TestOAuth2.cs +++ b/projects/Test/OAuth2/TestOAuth2.cs @@ -116,13 +116,13 @@ public async Task DisposeAsync() if (_connection != null) { await _connection.CloseAsync(); + await _connection.DisposeAsync(); } } finally { _doneEvent.Dispose(); _producerCredentialsProvider?.Dispose(); - _connection?.Dispose(); _cancellationTokenSource.Dispose(); } } @@ -224,14 +224,13 @@ public async Task SecondConnectionCrashes_GH1429() Assert.NotNull(_connectionFactory); // https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1429 IConnection secondConnection = await _connectionFactory.CreateConnectionAsync(CancellationToken.None); - secondConnection.Dispose(); + await secondConnection.DisposeAsync(); } private async Task DeclarePublishChannelAsync() { Assert.NotNull(_connection); - IChannel publishChannel = await _connection.CreateChannelAsync(); - await publishChannel.ConfirmSelectAsync(); + IChannel publishChannel = await _connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); await publishChannel.ExchangeDeclareAsync("test_direct", ExchangeType.Direct, true, false); return publishChannel; } @@ -246,11 +245,9 @@ private async Task PublishAsync(IChannel publishChannel) AppId = "oauth2", }; - await publishChannel.BasicPublishAsync(exchange: Exchange, routingKey: "hello", false, basicProperties: properties, body: body); - _testOutputHelper.WriteLine("Sent message"); - - await publishChannel.WaitForConfirmsOrDieAsync(); - _testOutputHelper.WriteLine("Confirmed Sent message"); + await publishChannel.BasicPublishAsync(exchange: Exchange, routingKey: "hello", false, + basicProperties: properties, body: body); + _testOutputHelper.WriteLine("Sent and confirmed message"); } private async ValueTask DeclareConsumeChannelAsync() diff --git a/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs b/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs index cc7a962417..eaadbdd3b9 100644 --- a/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs +++ b/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs @@ -48,10 +48,12 @@ public async Task BlockAsync() await Task.Delay(TimeSpan.FromSeconds(1)); } - public async Task BlockAsync(IChannel channel) + public async Task BlockAndPublishAsync() { + // TODO fix publisher confirmation tracking to time out so this test succeeds + await using IChannel ch = await _conn.CreateChannelAsync(); await BlockAsync(); - await channel.BasicPublishAsync(exchange: "amq.direct", + await ch.BasicPublishAsync(exchange: "amq.direct", routingKey: Guid.NewGuid().ToString(), _encoding.GetBytes("message")); } diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs index 107168f9f2..7ea58dc227 100644 --- a/projects/Test/SequentialIntegration/TestActivitySource.cs +++ b/projects/Test/SequentialIntegration/TestActivitySource.cs @@ -80,8 +80,6 @@ void AssertIntTagGreaterThanZero(Activity activity, string name) [InlineData(false)] public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var _activities = new List(); using ActivityListener activityListener = StartActivityListener(_activities); @@ -102,7 +100,6 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); - await _channel.WaitForConfirmsOrDieAsync(); await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.True(await consumerReceivedTcs.Task); @@ -117,8 +114,6 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera [InlineData(false)] public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var _activities = new List(); using ActivityListener activityListener = StartActivityListener(_activities); @@ -141,7 +136,6 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool use CachedString exchange = new CachedString(""); CachedString routingKey = new CachedString(q.QueueName); await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody); - await _channel.WaitForConfirmsOrDieAsync(); await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.True(await consumerReceivedTcs.Task); @@ -156,8 +150,6 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool use [InlineData(false)] public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var _activities = new List(); using ActivityListener activityListener = StartActivityListener(_activities); @@ -179,7 +171,6 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(boo string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); PublicationAddress publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName); await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody); - await _channel.WaitForConfirmsOrDieAsync(); await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.True(await consumerReceivedTcs.Task); @@ -194,8 +185,6 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(boo [InlineData(false)] public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); using ActivityListener activityListener = StartActivityListener(activities); @@ -217,7 +206,6 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); - await _channel.WaitForConfirmsOrDieAsync(); await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.True(await consumerReceivedTcs.Task); @@ -232,8 +220,6 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs [InlineData(false)] public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); using ActivityListener activityListener = StartActivityListener(activities); @@ -257,7 +243,6 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo CachedString exchange = new CachedString(""); CachedString routingKey = new CachedString(q.QueueName); await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody); - await _channel.WaitForConfirmsOrDieAsync(); await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.True(await consumerReceivedTcs.Task); @@ -272,8 +257,6 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo [InlineData(false)] public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); using ActivityListener activityListener = StartActivityListener(activities); @@ -296,7 +279,6 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName); await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody); - await _channel.WaitForConfirmsOrDieAsync(); await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.True(await consumerReceivedTcs.Task); @@ -311,7 +293,6 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn [InlineData(false)] public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); using ActivityListener activityListener = StartActivityListener(activities); @@ -323,7 +304,6 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera { await _channel.QueueDeclareAsync(queue, false, false, false, null); await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg)); - await _channel.WaitForConfirmsOrDieAsync(); QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); Assert.Equal(1u, ok.MessageCount); BasicGetResult res = await _channel.BasicGetAsync(queue, true); @@ -344,7 +324,6 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera [InlineData(false)] public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); using ActivityListener activityListener = StartActivityListener(activities); @@ -358,7 +337,6 @@ public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool use CachedString routingKey = new CachedString(queue); await _channel.QueueDeclareAsync(queue, false, false, false, null); await _channel.BasicPublishAsync(exchange, routingKey, true, Encoding.UTF8.GetBytes(msg)); - await _channel.WaitForConfirmsOrDieAsync(); QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); Assert.Equal(1u, ok.MessageCount); BasicGetResult res = await _channel.BasicGetAsync(queue, true); @@ -379,7 +357,6 @@ public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool use [InlineData(false)] public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTags(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); using ActivityListener activityListener = StartActivityListener(activities); @@ -393,7 +370,6 @@ public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTags(boo await _channel.QueueDeclareAsync(queue, false, false, false, null); await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), Encoding.UTF8.GetBytes(msg)); - await _channel.WaitForConfirmsOrDieAsync(); QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); Assert.Equal(1u, ok.MessageCount); BasicGetResult res = await _channel.BasicGetAsync(queue, true); diff --git a/projects/Test/SequentialIntegration/TestConnectionBlocked.cs b/projects/Test/SequentialIntegration/TestConnectionBlocked.cs index d4ffaf6073..92c095768b 100644 --- a/projects/Test/SequentialIntegration/TestConnectionBlocked.cs +++ b/projects/Test/SequentialIntegration/TestConnectionBlocked.cs @@ -60,10 +60,10 @@ public override async Task DisposeAsync() public async Task TestConnectionBlockedNotification() { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _conn.ConnectionBlockedAsync += async (object sender, ConnectionBlockedEventArgs args) => + _conn.ConnectionBlockedAsync += (object sender, ConnectionBlockedEventArgs args) => { - // TODO should this continue to be doing fire and forget? - await UnblockAsync(); + UnblockAsync(); + return Task.CompletedTask; }; _conn.ConnectionUnblockedAsync += (object sender, AsyncEventArgs ea) => @@ -72,7 +72,7 @@ public async Task TestConnectionBlockedNotification() return Task.CompletedTask; }; - await BlockAsync(_channel); + await BlockAndPublishAsync(); await tcs.Task.WaitAsync(TimeSpan.FromSeconds(15)); Assert.True(await tcs.Task, "Unblock notification not received."); } @@ -82,14 +82,14 @@ public async Task TestDisposeOnBlockedConnectionDoesNotHang() { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - await BlockAsync(_channel); + await BlockAndPublishAsync(); Task disposeTask = Task.Run(async () => { try { await _conn.AbortAsync(); - _conn.Dispose(); + await _conn.DisposeAsync(); tcs.SetResult(true); } catch (Exception) diff --git a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs index abf8e238bf..4c25108d26 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs +++ b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs @@ -135,7 +135,7 @@ Task _channel_ChannelShutdownAsync(object sender, ShutdownEventArgs e) Assert.False(_channel.IsClosed); await _channel.CloseAsync(); - _channel.Dispose(); + await _channel.DisposeAsync(); Assert.True(_channel.IsClosed); _channel = null; @@ -155,11 +155,12 @@ public async Task TestBlockedListenersRecovery() }; await CloseAndWaitForRecoveryAsync(); await CloseAndWaitForRecoveryAsync(); - await BlockAsync(_channel); + await BlockAndPublishAsync(); await WaitAsync(tcs, "connection blocked"); } finally { + // NOTE: must unblock so that close succeeeds on test tear-down await UnblockAsync(); } } @@ -194,15 +195,12 @@ public async Task TestClientNamedTransientAutoDeleteQueueAndBindingRecovery() await RestartServerAndWaitForRecoveryAsync(); Assert.True(_channel.IsOpen); - await _channel.ConfirmSelectAsync(); QueueDeclareOk ok0 = await _channel.QueueDeclarePassiveAsync(queue: queueName); Assert.Equal(queueName, ok0.QueueName); await _channel.QueuePurgeAsync(queueName); await _channel.ExchangeDeclarePassiveAsync(exchange: exchangeName); await _channel.BasicPublishAsync(exchange: exchangeName, routingKey: "", body: _encoding.GetBytes("msg")); - await WaitForConfirmsWithCancellationAsync(_channel); - QueueDeclareOk ok1 = await _channel.QueueDeclarePassiveAsync(queue: queueName); Assert.Equal(1u, ok1.MessageCount); } @@ -240,10 +238,8 @@ public async Task TestServerNamedTransientAutoDeleteQueueAndBindingRecovery() Assert.True(_channel.IsOpen); Assert.NotEqual(nameBefore, nameAfter); - await _channel.ConfirmSelectAsync(); await _channel.ExchangeDeclareAsync(exchange: x, type: "fanout"); await _channel.BasicPublishAsync(exchange: x, routingKey: "", body: _encoding.GetBytes("msg")); - await WaitForConfirmsWithCancellationAsync(_channel); QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(nameAfter); Assert.Equal(1u, ok.MessageCount); @@ -345,7 +341,7 @@ public async Task TestUnblockedListenersRecovery() }; await CloseAndWaitForRecoveryAsync(); await CloseAndWaitForRecoveryAsync(); - await BlockAsync(_channel); + await BlockAndPublishAsync(); await UnblockAsync(); await WaitAsync(tcs, "connection unblocked"); } diff --git a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs index 7a99a00e63..2a2fb65670 100644 --- a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs +++ b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs @@ -95,7 +95,6 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera string baggageGuid = Guid.NewGuid().ToString(); Baggage.SetBaggage("TestItem", baggageGuid); Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; await Task.Delay(500); @@ -125,7 +124,6 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); - await _channel.WaitForConfirmsOrDieAsync(); Baggage.ClearBaggage(); Assert.Null(Baggage.GetBaggage("TestItem")); @@ -150,7 +148,6 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs string baggageGuid = Guid.NewGuid().ToString(); Baggage.SetBaggage("TestItem", baggageGuid); Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; await Task.Delay(500); @@ -181,7 +178,6 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); - await _channel.WaitForConfirmsOrDieAsync(); Baggage.ClearBaggage(); Assert.Null(Baggage.GetBaggage("TestItem")); @@ -206,7 +202,6 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn string baggageGuid = Guid.NewGuid().ToString(); Baggage.SetBaggage("TestItem", baggageGuid); Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; await Task.Delay(500); @@ -238,7 +233,6 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", queueName); await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody); - await _channel.WaitForConfirmsOrDieAsync(); Baggage.ClearBaggage(); Assert.Null(Baggage.GetBaggage("TestItem")); @@ -263,7 +257,6 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo string baggageGuid = Guid.NewGuid().ToString(); Baggage.SetBaggage("TestItem", baggageGuid); Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; await Task.Delay(500); @@ -296,7 +289,6 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo CachedString exchange = new CachedString(""); CachedString routingKey = new CachedString(queueName); await _channel.BasicPublishAsync(exchange, routingKey, sendBody); - await _channel.WaitForConfirmsOrDieAsync(); Baggage.ClearBaggage(); Assert.Null(Baggage.GetBaggage("TestItem")); @@ -321,7 +313,6 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera string baggageGuid = Guid.NewGuid().ToString(); Baggage.SetBaggage("TestItem", baggageGuid); Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; await Task.Delay(500); string queue = $"queue-{Guid.NewGuid()}"; @@ -331,7 +322,6 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera { await _channel.QueueDeclareAsync(queue, false, false, false, null); await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg)); - await _channel.WaitForConfirmsOrDieAsync(); Baggage.ClearBaggage(); Assert.Null(Baggage.GetBaggage("TestItem")); QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);