diff --git a/projects/RabbitMQ.Client/Constants.cs b/projects/RabbitMQ.Client/Constants.cs
index 637d53a78..c6748c872 100644
--- a/projects/RabbitMQ.Client/Constants.cs
+++ b/projects/RabbitMQ.Client/Constants.cs
@@ -87,9 +87,15 @@ public static class Constants
///
/// The default consumer dispatch concurrency. See
/// to set this value for every channel created on a connection,
- /// and
+ /// and
/// for setting this value for a particular channel.
///
public const ushort DefaultConsumerDispatchConcurrency = 1;
+
+ ///
+ /// The message header used to track publish sequence numbers, to allow correlation when
+ /// basic.return is sent via the broker.
+ ///
+ public const string PublishSequenceNumberHeader = "x-dotnet-pub-seq-no";
}
}
diff --git a/projects/RabbitMQ.Client/CreateChannelOptions.cs b/projects/RabbitMQ.Client/CreateChannelOptions.cs
new file mode 100644
index 000000000..17a9d6105
--- /dev/null
+++ b/projects/RabbitMQ.Client/CreateChannelOptions.cs
@@ -0,0 +1,35 @@
+namespace RabbitMQ.Client
+{
+ ///
+ /// Channel creation options.
+ ///
+ public sealed class CreateChannelOptions
+ {
+ ///
+ /// Enable or disable publisher confirmations on this channel. Defaults to false
+ ///
+ public bool PublisherConfirmationsEnabled { get; set; } = false;
+
+ ///
+ /// Should this library track publisher confirmations for you? Defaults to false
+ ///
+ public bool PublisherConfirmationTrackingEnabled { get; set; } = false;
+
+ ///
+ /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one
+ /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
+ /// can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
+ ///
+ /// Defaults to null, which will use the value from
+ ///
+ /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
+ /// In addition to that consumers need to be thread/concurrency safe.
+ ///
+ public ushort? ConsumerDispatchConcurrency { get; set; } = null;
+
+ ///
+ /// The default channel options.
+ ///
+ public static CreateChannelOptions Default { get; } = new CreateChannelOptions();
+ }
+}
diff --git a/projects/RabbitMQ.Client/Events/ShutdownEventArgs.cs b/projects/RabbitMQ.Client/Events/ShutdownEventArgs.cs
index cf8ed5b3c..f5b5bd282 100644
--- a/projects/RabbitMQ.Client/Events/ShutdownEventArgs.cs
+++ b/projects/RabbitMQ.Client/Events/ShutdownEventArgs.cs
@@ -72,7 +72,8 @@ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string r
///
/// Construct a with the given parameters.
///
- public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception, CancellationToken cancellationToken = default)
+ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText,
+ Exception exception, CancellationToken cancellationToken = default)
: this(initiator, replyCode, replyText, 0, 0, cancellationToken: cancellationToken)
{
_exception = exception ?? throw new ArgumentNullException(nameof(exception));
diff --git a/projects/RabbitMQ.Client/Exceptions/OperationInterruptedException.cs b/projects/RabbitMQ.Client/Exceptions/OperationInterruptedException.cs
index e1cb60e59..78e2238d6 100644
--- a/projects/RabbitMQ.Client/Exceptions/OperationInterruptedException.cs
+++ b/projects/RabbitMQ.Client/Exceptions/OperationInterruptedException.cs
@@ -45,35 +45,30 @@ namespace RabbitMQ.Client.Exceptions
public class OperationInterruptedException
: RabbitMQClientException
{
- ///Construct an OperationInterruptedException with
- ///the passed-in explanation, if any.
- public OperationInterruptedException(ShutdownEventArgs? reason)
- : base(reason is null ? "The AMQP operation was interrupted" :
- $"The AMQP operation was interrupted: {reason}")
+ ///
+ ///Construct an OperationInterruptedException
+ ///
+ public OperationInterruptedException() : base("The AMQP operation was interrupted")
{
- ShutdownReason = reason;
- }
- ///Construct an OperationInterruptedException with
- ///the passed-in explanation and prefix, if any.
- public OperationInterruptedException(ShutdownEventArgs? reason, string prefix)
- : base(reason is null ? $"{prefix}: The AMQP operation was interrupted" :
- $"{prefix}: The AMQP operation was interrupted: {reason}")
- {
- ShutdownReason = reason;
}
-
- protected OperationInterruptedException()
+ ///
+ ///Construct an OperationInterruptedException with
+ ///the passed-in explanation, if any.
+ ///
+ public OperationInterruptedException(ShutdownEventArgs reason)
+ : base($"The AMQP operation was interrupted: {reason}", reason.Exception)
{
+ ShutdownReason = reason;
}
- protected OperationInterruptedException(string message) : base(message)
- {
- }
- protected OperationInterruptedException(string message, Exception inner)
- : base(message, inner)
+ ///Construct an OperationInterruptedException with
+ ///the passed-in explanation and prefix, if any.
+ public OperationInterruptedException(ShutdownEventArgs reason, string prefix)
+ : base($"{prefix}: The AMQP operation was interrupted: {reason}", reason.Exception)
{
+ ShutdownReason = reason;
}
///Retrieves the explanation for the shutdown. May
diff --git a/projects/RabbitMQ.Client/Exceptions/ProtocolViolationException.cs b/projects/RabbitMQ.Client/Exceptions/ProtocolViolationException.cs
index 249ab78b6..2e973f5f4 100644
--- a/projects/RabbitMQ.Client/Exceptions/ProtocolViolationException.cs
+++ b/projects/RabbitMQ.Client/Exceptions/ProtocolViolationException.cs
@@ -36,13 +36,15 @@ namespace RabbitMQ.Client.Exceptions
[Serializable]
public class ProtocolViolationException : RabbitMQClientException
{
- public ProtocolViolationException(string message) : base(message)
+ public ProtocolViolationException() : base()
{
}
- public ProtocolViolationException(string message, Exception inner) : base(message, inner)
+
+ public ProtocolViolationException(string message) : base(message)
{
}
- public ProtocolViolationException()
+
+ public ProtocolViolationException(string message, Exception inner) : base(message, inner)
{
}
}
diff --git a/projects/RabbitMQ.Client/Exceptions/PublishException.cs b/projects/RabbitMQ.Client/Exceptions/PublishException.cs
new file mode 100644
index 000000000..ac404f660
--- /dev/null
+++ b/projects/RabbitMQ.Client/Exceptions/PublishException.cs
@@ -0,0 +1,66 @@
+// This source code is dual-licensed under the Apache License, version
+// 2.0, and the Mozilla Public License, version 2.0.
+//
+// The APL v2.0:
+//
+//---------------------------------------------------------------------------
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//---------------------------------------------------------------------------
+//
+// The MPL v2.0:
+//
+//---------------------------------------------------------------------------
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+//
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//---------------------------------------------------------------------------
+
+using System;
+
+namespace RabbitMQ.Client.Exceptions
+{
+ ///
+ /// Class for exceptions related to publisher confirmations
+ /// or the mandatory flag.
+ ///
+ public class PublishException : RabbitMQClientException
+ {
+ private bool _isReturn = false;
+ private ulong _publishSequenceNumber = ulong.MinValue;
+
+ public PublishException(ulong publishSequenceNumber, bool isReturn) : base()
+ {
+ if (publishSequenceNumber == ulong.MinValue)
+ {
+ throw new ArgumentException($"{nameof(publishSequenceNumber)} must not be 0");
+ }
+
+ _isReturn = isReturn;
+ _publishSequenceNumber = publishSequenceNumber;
+ }
+
+ ///
+ /// true if this exception is due to a basic.return
+ ///
+ public bool IsReturn => _isReturn;
+
+ ///
+ /// Retrieve the publish sequence number.
+ ///
+ public ulong PublishSequenceNumber => _publishSequenceNumber;
+ }
+}
diff --git a/projects/RabbitMQ.Client/Exceptions/RabbitMQClientException.cs b/projects/RabbitMQ.Client/Exceptions/RabbitMQClientException.cs
index c8a537115..97e540399 100644
--- a/projects/RabbitMQ.Client/Exceptions/RabbitMQClientException.cs
+++ b/projects/RabbitMQ.Client/Exceptions/RabbitMQClientException.cs
@@ -37,24 +37,21 @@ namespace RabbitMQ.Client.Exceptions
public abstract class RabbitMQClientException : Exception
{
/// Initializes a new instance of the class.
- protected RabbitMQClientException()
+ protected RabbitMQClientException() : base()
{
-
}
/// Initializes a new instance of the class with a specified error message.
/// The message that describes the error.
protected RabbitMQClientException(string message) : base(message)
{
-
}
/// Initializes a new instance of the class with a specified error message and a reference to the inner exception that is the cause of this exception.
/// The error message that explains the reason for the exception.
/// The exception that is the cause of the current exception, or a null reference (Nothing in Visual Basic) if no inner exception is specified.
- protected RabbitMQClientException(string message, Exception innerException) : base(message, innerException)
+ protected RabbitMQClientException(string message, Exception? innerException) : base(message, innerException)
{
-
}
}
}
diff --git a/projects/RabbitMQ.Client/Exceptions/UnexpectedMethodException.cs b/projects/RabbitMQ.Client/Exceptions/UnexpectedMethodException.cs
index 5e925053d..7da8804e9 100644
--- a/projects/RabbitMQ.Client/Exceptions/UnexpectedMethodException.cs
+++ b/projects/RabbitMQ.Client/Exceptions/UnexpectedMethodException.cs
@@ -35,6 +35,7 @@
namespace RabbitMQ.Client.Exceptions
{
///
+ /// TODO WHY IS THIS UNREFERENCED?
/// Thrown when the channel receives an RPC reply that it wasn't expecting.
///
[Serializable]
diff --git a/projects/RabbitMQ.Client/Framing/Channel.cs b/projects/RabbitMQ.Client/Framing/Channel.cs
index ddb1394bd..a6c809d36 100644
--- a/projects/RabbitMQ.Client/Framing/Channel.cs
+++ b/projects/RabbitMQ.Client/Framing/Channel.cs
@@ -35,6 +35,7 @@
namespace RabbitMQ.Client.Framing
{
+ // TODO merge into ChannelBase
internal class Channel : ChannelBase
{
public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null)
diff --git a/projects/RabbitMQ.Client/IChannel.cs b/projects/RabbitMQ.Client/IChannel.cs
index 63fc72756..852b7e8ee 100644
--- a/projects/RabbitMQ.Client/IChannel.cs
+++ b/projects/RabbitMQ.Client/IChannel.cs
@@ -204,6 +204,7 @@ Task BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b
/// CancellationToken for this operation.
///
/// Routing key must be shorter than 255 bytes.
+ /// Throws if a nack or basic.return is returned for the message.
///
ValueTask BasicPublishAsync(string exchange, string routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory body,
@@ -221,6 +222,7 @@ ValueTask BasicPublishAsync(string exchange, string routingKey,
/// CancellationToken for this operation.
///
/// Routing key must be shorter than 255 bytes.
+ /// Throws if a nack or basic.return is returned for the message.
///
ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory body,
@@ -265,14 +267,6 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort,
Task CloseAsync(ShutdownEventArgs reason, bool abort,
CancellationToken cancellationToken = default);
- ///
- /// Asynchronously enable publisher confirmations.
- ///
- /// Set to false if tracking via and yourself.
- /// CancellationToken for this operation.
- Task ConfirmSelectAsync(bool trackConfirmations = true,
- CancellationToken cancellationToken = default);
-
/// Asynchronously declare an exchange.
/// The name of the exchange.
/// The type of the exchange.
@@ -451,32 +445,6 @@ Task QueueUnbindAsync(string queue, string exchange, string routingKey,
/// The cancellation token.
Task TxSelectAsync(CancellationToken cancellationToken = default);
- ///
- /// Asynchronously wait until all published messages on this channel have been confirmed.
- ///
- /// True if no nacks were received within the timeout, otherwise false.
- /// The cancellation token.
- ///
- /// Waits until all messages published on this channel since the last call have
- /// been either ack'd or nack'd by the server. Returns whether
- /// all the messages were ack'd (and none were nack'd).
- /// Throws an exception when called on a channel
- /// that does not have publisher confirms enabled.
- ///
- Task WaitForConfirmsAsync(CancellationToken cancellationToken = default);
-
- ///
- /// Wait until all published messages on this channel have been confirmed.
- ///
- /// The cancellation token.
- ///
- /// Waits until all messages published on this channel since the last call have
- /// been ack'd by the server. If a nack is received or the timeout
- /// elapses, throws an IOException exception immediately and closes
- /// the channel.
- ///
- Task WaitForConfirmsOrDieAsync(CancellationToken cancellationToken = default);
-
///
/// Amount of time protocol operations (e.g. queue.declare
) are allowed to take before
/// timing out.
diff --git a/projects/RabbitMQ.Client/IConnection.cs b/projects/RabbitMQ.Client/IConnection.cs
index c562307a7..8217013e3 100644
--- a/projects/RabbitMQ.Client/IConnection.cs
+++ b/projects/RabbitMQ.Client/IConnection.cs
@@ -240,18 +240,11 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
///
/// Asynchronously create and return a fresh channel, session, and channel.
///
- ///
- /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one
- /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
- /// can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
- ///
- /// Defaults to null, which will use the value from
- ///
- /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
- /// In addition to that consumers need to be thread/concurrency safe.
+ ///
+ /// The channel creation options.
///
/// Cancellation token
- Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
+ Task CreateChannelAsync(CreateChannelOptions? options = default,
CancellationToken cancellationToken = default);
}
}
diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
index 6e88ee907..2a2efec63 100644
--- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
+++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
@@ -49,8 +49,8 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private ushort _prefetchCountConsumer;
private ushort _prefetchCountGlobal;
- private bool _usesPublisherConfirms;
- private bool _tracksPublisherConfirmations;
+ private bool _publisherConfirmationsEnabled = false;
+ private bool _publisherConfirmationTrackingEnabled = false;
private bool _usesTransactions;
private ushort _consumerDispatchConcurrency;
@@ -72,11 +72,13 @@ public TimeSpan ContinuationTimeout
}
public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel,
- ushort consumerDispatchConcurrency)
+ ushort consumerDispatchConcurrency, bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled)
{
_connection = conn;
_innerChannel = innerChannel;
_consumerDispatchConcurrency = consumerDispatchConcurrency;
+ _publisherConfirmationsEnabled = publisherConfirmationsEnabled;
+ _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
}
public event AsyncEventHandler BasicAcksAsync
@@ -161,8 +163,9 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection con
_connection = conn;
- RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_consumerDispatchConcurrency,
- cancellationToken: cancellationToken)
+ RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(
+ _publisherConfirmationsEnabled, _publisherConfirmationTrackingEnabled,
+ _consumerDispatchConcurrency, cancellationToken)
.ConfigureAwait(false);
newChannel.TakeOver(_innerChannel);
@@ -178,12 +181,6 @@ await newChannel.BasicQosAsync(0, _prefetchCountGlobal, true, cancellationToken)
.ConfigureAwait(false);
}
- if (_usesPublisherConfirms)
- {
- await newChannel.ConfirmSelectAsync(_tracksPublisherConfirmations, cancellationToken)
- .ConfigureAwait(false);
- }
-
if (_usesTransactions)
{
await newChannel.TxSelectAsync(cancellationToken)
@@ -347,14 +344,6 @@ public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global,
return _innerChannel.BasicQosAsync(prefetchSize, prefetchCount, global, cancellationToken);
}
- public async Task ConfirmSelectAsync(bool trackConfirmations = true, CancellationToken cancellationToken = default)
- {
- await InnerChannel.ConfirmSelectAsync(trackConfirmations, cancellationToken)
- .ConfigureAwait(false);
- _usesPublisherConfirms = true;
- _tracksPublisherConfirmations = trackConfirmations;
- }
-
public async Task ExchangeBindAsync(string destination, string source, string routingKey,
IDictionary? arguments, bool noWait,
CancellationToken cancellationToken)
@@ -487,12 +476,6 @@ public Task TxSelectAsync(CancellationToken cancellationToken)
return InnerChannel.TxSelectAsync(cancellationToken);
}
- public Task WaitForConfirmsAsync(CancellationToken token = default)
- => InnerChannel.WaitForConfirmsAsync(token);
-
- public Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
- => InnerChannel.WaitForConfirmsOrDieAsync(token);
-
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ThrowIfDisposed()
{
diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
index dc99e33fc..5cee094c3 100644
--- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
+++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
@@ -184,12 +184,17 @@ public event AsyncEventHandler RecoveringConsumerAs
public IProtocol Protocol => Endpoint.Protocol;
- public async ValueTask CreateNonRecoveringChannelAsync(ushort consumerDispatchConcurrency,
+ public async ValueTask CreateNonRecoveringChannelAsync(
+ bool publisherConfirmationsEnabled = false,
+ bool publisherConfirmationTrackingEnabled = false,
+ ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default)
{
ISession session = InnerConnection.CreateSession();
var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency);
- return (RecoveryAwareChannel)await result.OpenAsync(cancellationToken).ConfigureAwait(false);
+ return (RecoveryAwareChannel)await result.OpenAsync(
+ publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, cancellationToken)
+ .ConfigureAwait(false);
}
public override string ToString()
@@ -251,17 +256,24 @@ await CloseInnerConnectionAsync()
}
}
- public async Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
+ public async Task CreateChannelAsync(CreateChannelOptions? options = default,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();
- ushort cdc = consumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
- RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken)
+
+ options ??= CreateChannelOptions.Default;
+
+ ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
+
+ RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(
+ options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cdc, cancellationToken)
.ConfigureAwait(false);
- AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc);
- await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
+
+ var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc,
+ options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled);
+ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
- return channel;
+ return autorecoveringChannel;
}
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
diff --git a/projects/RabbitMQ.Client/Impl/ChannelBase.cs b/projects/RabbitMQ.Client/Impl/ChannelBase.cs
index 517cf6b8b..6ba9ed60c 100644
--- a/projects/RabbitMQ.Client/Impl/ChannelBase.cs
+++ b/projects/RabbitMQ.Client/Impl/ChannelBase.cs
@@ -30,6 +30,8 @@
//---------------------------------------------------------------------------
using System;
+using System.Buffers.Binary;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
@@ -42,6 +44,7 @@
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing;
+using RabbitMQ.Client.Util;
namespace RabbitMQ.Client.Impl
{
@@ -57,21 +60,18 @@ internal abstract class ChannelBase : IChannel, IRecoverable
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
private readonly AsyncManualResetEvent _flowControlBlock = new AsyncManualResetEvent(true);
- private ulong _nextPublishSeqNo;
- private SemaphoreSlim? _confirmSemaphore;
- private bool _trackConfirmations;
- private LinkedList? _pendingDeliveryTags;
- private List>? _confirmsTaskCompletionSources;
-
- private bool _onlyAcksReceived = true;
+ private bool _publisherConfirmationsEnabled = false;
+ private bool _publisherConfirmationTrackingEnabled = false;
+ private ulong _nextPublishSeqNo = 0;
+ private readonly SemaphoreSlim _confirmSemaphore = new(1, 1);
+ private readonly ConcurrentDictionary> _confirmsTaskCompletionSources = new();
private ShutdownEventArgs? _closeReason;
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
internal readonly IConsumerDispatcher ConsumerDispatcher;
- protected ChannelBase(ConnectionConfig config, ISession session,
- ushort? perChannelConsumerDispatchConcurrency = null)
+ protected ChannelBase(ConnectionConfig config, ISession session, ushort? perChannelConsumerDispatchConcurrency = null)
{
ContinuationTimeout = config.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this,
@@ -371,8 +371,13 @@ protected bool Enqueue(IRpcContinuation k)
}
}
- internal async Task OpenAsync(CancellationToken cancellationToken)
+ internal async Task OpenAsync(bool publisherConfirmationsEnabled = false,
+ bool publisherConfirmationTrackingEnabled = false,
+ CancellationToken cancellationToken = default)
{
+ _publisherConfirmationsEnabled = publisherConfirmationsEnabled;
+ _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
+
bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
@@ -388,7 +393,12 @@ await ModelSendAsync(in method, k.CancellationToken)
bool result = await k;
Debug.Assert(result);
- return this;
+
+ if (_publisherConfirmationsEnabled)
+ {
+ await ConfirmSelectAsync(publisherConfirmationTrackingEnabled, cancellationToken)
+ .ConfigureAwait(false);
+ }
}
finally
{
@@ -398,6 +408,8 @@ await ModelSendAsync(in method, k.CancellationToken)
}
_rpcSemaphore.Release();
}
+
+ return this;
}
internal async Task FinishCloseAsync(CancellationToken cancellationToken)
@@ -412,9 +424,6 @@ await Session.CloseAsync(reason)
m_connectionStartCell?.TrySetResult(null);
}
- [MemberNotNullWhen(true, nameof(_confirmSemaphore))]
- private bool ConfirmsAreEnabled => _confirmSemaphore != null;
-
private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
/*
@@ -489,16 +498,16 @@ private async Task OnChannelShutdownAsync(ShutdownEventArgs reason)
await _channelShutdownAsyncWrapper.InvokeAsync(this, reason)
.ConfigureAwait(false);
- if (ConfirmsAreEnabled)
+ if (_publisherConfirmationsEnabled)
{
await _confirmSemaphore.WaitAsync(reason.CancellationToken)
.ConfigureAwait(false);
try
{
- if (_confirmsTaskCompletionSources?.Count > 0)
+ if (!_confirmsTaskCompletionSources.IsEmpty)
{
var exception = new AlreadyClosedException(reason);
- foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
+ foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources.Values)
{
confirmsTaskCompletionSource.TrySetException(exception);
}
@@ -594,7 +603,8 @@ public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heart
return ModelSendAsync(in method, cancellationToken).AsTask();
}
- protected async Task HandleBasicAck(IncomingCommand cmd, CancellationToken cancellationToken)
+ protected async Task HandleBasicAck(IncomingCommand cmd,
+ CancellationToken cancellationToken = default)
{
var ack = new BasicAck(cmd.MethodSpan);
if (!_basicAcksAsyncWrapper.IsEmpty)
@@ -604,12 +614,14 @@ await _basicAcksAsyncWrapper.InvokeAsync(this, args)
.ConfigureAwait(false);
}
- await HandleAckNack(ack._deliveryTag, ack._multiple, false, cancellationToken)
+ await HandleAck(ack._deliveryTag, ack._multiple, cancellationToken)
.ConfigureAwait(false);
+
return true;
}
- protected async Task HandleBasicNack(IncomingCommand cmd, CancellationToken cancellationToken)
+ protected async Task HandleBasicNack(IncomingCommand cmd,
+ CancellationToken cancellationToken = default)
{
var nack = new BasicNack(cmd.MethodSpan);
if (!_basicNacksAsyncWrapper.IsEmpty)
@@ -620,8 +632,40 @@ await _basicNacksAsyncWrapper.InvokeAsync(this, args)
.ConfigureAwait(false);
}
- await HandleAckNack(nack._deliveryTag, nack._multiple, true, cancellationToken)
+ await HandleNack(nack._deliveryTag, nack._multiple, false, cancellationToken)
.ConfigureAwait(false);
+
+ return true;
+ }
+
+ protected async Task HandleBasicReturn(IncomingCommand cmd, CancellationToken cancellationToken)
+ {
+ var basicReturn = new BasicReturn(cmd.MethodSpan);
+
+ var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText,
+ basicReturn._exchange, basicReturn._routingKey,
+ new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory, cancellationToken);
+
+ if (!_basicReturnAsyncWrapper.IsEmpty)
+ {
+ await _basicReturnAsyncWrapper.InvokeAsync(this, e)
+ .ConfigureAwait(false);
+ }
+
+ if (_publisherConfirmationsEnabled)
+ {
+ ulong publishSequenceNumber = 0;
+ IReadOnlyBasicProperties props = e.BasicProperties;
+ object? maybeSeqNum = props.Headers?[Constants.PublishSequenceNumberHeader];
+ if (maybeSeqNum != null)
+ {
+ publishSequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum);
+ }
+
+ await HandleNack(publishSequenceNumber, multiple: false, isReturn: true, cancellationToken)
+ .ConfigureAwait(false);
+ }
+
return true;
}
@@ -658,20 +702,6 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)
return deliveryTag;
}
- protected async Task HandleBasicReturn(IncomingCommand cmd, CancellationToken cancellationToken)
- {
- if (!_basicReturnAsyncWrapper.IsEmpty)
- {
- var basicReturn = new BasicReturn(cmd.MethodSpan);
- var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText,
- basicReturn._exchange, basicReturn._routingKey,
- new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory, cancellationToken);
- await _basicReturnAsyncWrapper.InvokeAsync(this, e)
- .ConfigureAwait(false);
- }
- return true;
- }
-
protected async Task HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
var channelClose = new ChannelClose(cmd.MethodSpan);
@@ -825,7 +855,7 @@ await Session.Connection.HandleConnectionUnblockedAsync(cancellationToken)
public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default)
{
- if (ConfirmsAreEnabled)
+ if (_publisherConfirmationsEnabled)
{
await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
@@ -973,83 +1003,79 @@ public async ValueTask BasicPublishAsync(string exchange, string ro
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
- if (ConfirmsAreEnabled)
+ TaskCompletionSource? publisherConfirmationTcs = null;
+ ulong publishSequenceNumber = 0;
+ try
{
- await _confirmSemaphore.WaitAsync(cancellationToken)
- .ConfigureAwait(false);
- try
+ if (_publisherConfirmationsEnabled)
{
- if (_trackConfirmations)
+ await _confirmSemaphore.WaitAsync(cancellationToken)
+ .ConfigureAwait(false);
+
+ publishSequenceNumber = _nextPublishSeqNo;
+
+ if (_publisherConfirmationTrackingEnabled)
{
- if (_pendingDeliveryTags is null)
- {
- throw new InvalidOperationException(InternalConstants.BugFound);
- }
- _pendingDeliveryTags.AddLast(_nextPublishSeqNo);
+ publisherConfirmationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
+ _confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs;
}
_nextPublishSeqNo++;
}
- finally
- {
- _confirmSemaphore.Release();
- }
- }
- try
- {
+ await EnforceFlowControlAsync(cancellationToken)
+ .ConfigureAwait(false);
+
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
+
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
: default;
- if (sendActivity != null)
+ BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber);
+ if (props is null)
{
- BasicProperties? props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
- if (props is null)
- {
- await EnforceFlowControlAsync(cancellationToken)
- .ConfigureAwait(false);
- await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
- .ConfigureAwait(false);
- }
- else
- {
- await EnforceFlowControlAsync(cancellationToken)
- .ConfigureAwait(false);
- await ModelSendAsync(in cmd, in props, body, cancellationToken)
- .ConfigureAwait(false);
- }
+ await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
+ .ConfigureAwait(false);
}
else
{
- await EnforceFlowControlAsync(cancellationToken)
- .ConfigureAwait(false);
- await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
+ await ModelSendAsync(in cmd, in props, body, cancellationToken)
.ConfigureAwait(false);
}
}
- catch
+ catch (Exception ex)
{
- if (ConfirmsAreEnabled)
+ if (_publisherConfirmationsEnabled)
{
- await _confirmSemaphore.WaitAsync(cancellationToken)
- .ConfigureAwait(false);
- try
- {
- _nextPublishSeqNo--;
- if (_trackConfirmations && _pendingDeliveryTags is not null)
- {
- _pendingDeliveryTags.RemoveLast();
- }
- }
- finally
+ _nextPublishSeqNo--;
+ if (_publisherConfirmationTrackingEnabled)
{
- _confirmSemaphore.Release();
+ _confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _);
}
}
- throw;
+ if (publisherConfirmationTcs is not null)
+ {
+ publisherConfirmationTcs.SetException(ex);
+ }
+ else
+ {
+ throw;
+ }
+ }
+ finally
+ {
+ if (_publisherConfirmationsEnabled)
+ {
+ _confirmSemaphore.Release();
+ }
+ }
+
+ if (publisherConfirmationTcs is not null)
+ {
+ await publisherConfirmationTcs.Task.WaitAsync(cancellationToken)
+ .ConfigureAwait(false);
}
}
@@ -1058,83 +1084,79 @@ public async ValueTask BasicPublishAsync(CachedString exchange, Cac
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
- if (ConfirmsAreEnabled)
+ TaskCompletionSource? publisherConfirmationTcs = null;
+ ulong publishSequenceNumber = 0;
+ try
{
- await _confirmSemaphore.WaitAsync(cancellationToken)
- .ConfigureAwait(false);
- try
+ if (_publisherConfirmationsEnabled)
{
- if (_trackConfirmations)
+ await _confirmSemaphore.WaitAsync(cancellationToken)
+ .ConfigureAwait(false);
+
+ publishSequenceNumber = _nextPublishSeqNo;
+
+ if (_publisherConfirmationTrackingEnabled)
{
- if (_pendingDeliveryTags is null)
- {
- throw new InvalidOperationException(InternalConstants.BugFound);
- }
- _pendingDeliveryTags.AddLast(_nextPublishSeqNo);
+ publisherConfirmationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
+ _confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs;
}
_nextPublishSeqNo++;
}
- finally
- {
- _confirmSemaphore.Release();
- }
- }
- try
- {
+ await EnforceFlowControlAsync(cancellationToken)
+ .ConfigureAwait(false);
+
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
+
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
: default;
- if (sendActivity != null)
+ BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber);
+ if (props is null)
{
- BasicProperties? props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
- if (props is null)
- {
- await EnforceFlowControlAsync(cancellationToken)
- .ConfigureAwait(false);
- await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
- .ConfigureAwait(false);
- }
- else
- {
- await EnforceFlowControlAsync(cancellationToken)
- .ConfigureAwait(false);
- await ModelSendAsync(in cmd, in props, body, cancellationToken)
- .ConfigureAwait(false);
- }
+ await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
+ .ConfigureAwait(false);
}
else
{
- await EnforceFlowControlAsync(cancellationToken)
- .ConfigureAwait(false);
- await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
+ await ModelSendAsync(in cmd, in props, body, cancellationToken)
.ConfigureAwait(false);
}
}
- catch
+ catch (Exception ex)
{
- if (ConfirmsAreEnabled)
+ if (_publisherConfirmationsEnabled)
{
- await _confirmSemaphore.WaitAsync(cancellationToken)
- .ConfigureAwait(false);
- try
+ _nextPublishSeqNo--;
+ if (_publisherConfirmationTrackingEnabled)
{
- _nextPublishSeqNo--;
- if (_trackConfirmations && _pendingDeliveryTags is not null)
- {
- _pendingDeliveryTags.RemoveLast();
- }
- }
- finally
- {
- _confirmSemaphore.Release();
+ _confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _);
}
}
- throw;
+ if (publisherConfirmationTcs is not null)
+ {
+ publisherConfirmationTcs.SetException(ex);
+ }
+ else
+ {
+ throw;
+ }
+ }
+ finally
+ {
+ if (_publisherConfirmationsEnabled)
+ {
+ _confirmSemaphore.Release();
+ }
+ }
+
+ if (publisherConfirmationTcs is not null)
+ {
+ await publisherConfirmationTcs.Task.WaitAsync(cancellationToken)
+ .ConfigureAwait(false);
}
}
@@ -1210,51 +1232,6 @@ await ModelSendAsync(in method, k.CancellationToken)
}
}
- public async Task ConfirmSelectAsync(bool trackConfirmations = true, CancellationToken cancellationToken = default)
- {
- _trackConfirmations = trackConfirmations;
- bool enqueued = false;
- var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
-
- await _rpcSemaphore.WaitAsync(k.CancellationToken)
- .ConfigureAwait(false);
- try
- {
- if (_nextPublishSeqNo == 0UL)
- {
- if (_trackConfirmations)
- {
- _pendingDeliveryTags = new LinkedList();
- _confirmsTaskCompletionSources = new List>();
- }
- _nextPublishSeqNo = 1;
- }
-
- enqueued = Enqueue(k);
-
- var method = new ConfirmSelect(false);
- await ModelSendAsync(in method, k.CancellationToken)
- .ConfigureAwait(false);
-
- bool result = await k;
- Debug.Assert(result);
-
- // Note:
- // Non-null means confirms are enabled
- _confirmSemaphore = new SemaphoreSlim(1, 1);
-
- return;
- }
- finally
- {
- if (false == enqueued)
- {
- k.Dispose();
- }
- _rpcSemaphore.Release();
- }
- }
-
public async Task ExchangeBindAsync(string destination, string source, string routingKey,
IDictionary? arguments, bool noWait,
CancellationToken cancellationToken)
@@ -1739,244 +1716,168 @@ await ModelSendAsync(in method, k.CancellationToken)
}
}
- public async Task WaitForConfirmsAsync(CancellationToken cancellationToken = default)
+ // NOTE: _rpcSemaphore is held
+ private async Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnablefd = false,
+ CancellationToken cancellationToken = default)
{
- if (false == ConfirmsAreEnabled)
- {
- throw new InvalidOperationException("Confirms not selected");
- }
+ _publisherConfirmationsEnabled = true;
+ _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnablefd;
- if (false == _trackConfirmations)
- {
- throw new InvalidOperationException("Confirmation tracking is not enabled");
- }
-
- if (_pendingDeliveryTags is null)
- {
- throw new InvalidOperationException(InternalConstants.BugFound);
- }
+ bool enqueued = false;
+ var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
- TaskCompletionSource tcs;
- await _confirmSemaphore.WaitAsync(cancellationToken)
- .ConfigureAwait(false);
try
{
- if (_pendingDeliveryTags.Count == 0)
+ if (_nextPublishSeqNo == 0UL)
{
- if (_onlyAcksReceived == false)
+ if (_publisherConfirmationTrackingEnabled)
{
- _onlyAcksReceived = true;
- return false;
+ _confirmsTaskCompletionSources.Clear();
}
-
- return true;
+ _nextPublishSeqNo = 1;
}
- tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- _confirmsTaskCompletionSources!.Add(tcs);
- }
- finally
- {
- _confirmSemaphore.Release();
- }
-
- bool rv;
+ enqueued = Enqueue(k);
- if (false == cancellationToken.CanBeCanceled)
- {
- rv = await tcs.Task.ConfigureAwait(false);
- }
- else
- {
- rv = await WaitForConfirmsWithTokenAsync(tcs, cancellationToken)
+ var method = new ConfirmSelect(false);
+ await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
- }
-
- return rv;
- }
-
- public async Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
- {
- if (false == ConfirmsAreEnabled)
- {
- throw new InvalidOperationException("Confirms not selected");
- }
- if (false == _trackConfirmations)
- {
- throw new InvalidOperationException("Confirmation tracking is not enabled");
- }
+ bool result = await k;
+ Debug.Assert(result);
- if (_pendingDeliveryTags is null)
- {
- throw new InvalidOperationException(InternalConstants.BugFound);
+ return;
}
-
- try
+ finally
{
- bool onlyAcksReceived = await WaitForConfirmsAsync(token)
- .ConfigureAwait(false);
-
- if (onlyAcksReceived)
+ if (false == enqueued)
{
- return;
+ k.Dispose();
}
-
- var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.ReplySuccess, "Nacks Received", new IOException("nack received"));
-
- await CloseAsync(ea, false, token)
- .ConfigureAwait(false);
- }
- catch (OperationCanceledException ex)
- {
- const string msg = "timed out waiting for acks";
- var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.ReplySuccess, msg, ex);
-
- await CloseAsync(ea, false, token)
- .ConfigureAwait(false);
-
- throw;
- }
- }
-
- private async Task WaitForConfirmsWithTokenAsync(TaskCompletionSource tcs,
- CancellationToken cancellationToken)
- {
- if (false == ConfirmsAreEnabled)
- {
- throw new InvalidOperationException("Confirms not selected");
- }
-
- if (false == _trackConfirmations)
- {
- throw new InvalidOperationException("Confirmation tracking is not enabled");
- }
-
- if (_pendingDeliveryTags is null)
- {
- throw new InvalidOperationException(InternalConstants.BugFound);
- }
-
- CancellationTokenRegistration tokenRegistration =
-#if NET6_0_OR_GREATER
- cancellationToken.UnsafeRegister(
- state => ((TaskCompletionSource)state!).TrySetCanceled(), tcs);
-#else
- cancellationToken.Register(
- state => ((TaskCompletionSource)state!).TrySetCanceled(),
- state: tcs, useSynchronizationContext: false);
-#endif
- try
- {
- return await tcs.Task.ConfigureAwait(false);
- }
- finally
- {
-#if NET6_0_OR_GREATER
- await tokenRegistration.DisposeAsync()
- .ConfigureAwait(false);
-#else
- tokenRegistration.Dispose();
-#endif
}
}
- // NOTE: this method is internal for its use in this test:
- // TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse
- internal async Task HandleAckNack(ulong deliveryTag, bool multiple, bool isNack, CancellationToken cancellationToken = default)
+ private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cancellationToken = default)
{
- // Only do this if confirms are enabled *and* the library is tracking confirmations
- if (ConfirmsAreEnabled && _trackConfirmations)
+ if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty)
{
- if (_pendingDeliveryTags is null)
- {
- throw new InvalidOperationException(InternalConstants.BugFound);
- }
- // let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted
- await _confirmSemaphore.WaitAsync(cancellationToken)
- .ConfigureAwait(false);
- try
+ if (multiple)
{
- // No need to do anything if there are no delivery tags in the list
- if (_pendingDeliveryTags.Count > 0)
+ foreach (KeyValuePair> pair in _confirmsTaskCompletionSources)
{
- if (multiple)
+ if (pair.Key <= deliveryTag)
{
- while (_pendingDeliveryTags.First!.Value < deliveryTag)
- {
- _pendingDeliveryTags.RemoveFirst();
- }
-
- if (_pendingDeliveryTags.First.Value == deliveryTag)
- {
- _pendingDeliveryTags.RemoveFirst();
- }
- }
- else
- {
- _pendingDeliveryTags.Remove(deliveryTag);
+ pair.Value.SetResult(true);
+ _confirmsTaskCompletionSources.Remove(pair.Key, out _);
}
}
+ }
+ else
+ {
+ if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource? tcs))
+ {
+ tcs.SetResult(true);
+ }
+ }
+ }
- _onlyAcksReceived = _onlyAcksReceived && !isNack;
+ return Task.CompletedTask;
+ }
- if (_pendingDeliveryTags.Count == 0 && _confirmsTaskCompletionSources!.Count > 0)
+ private Task HandleNack(ulong deliveryTag, bool multiple, bool isReturn,
+ CancellationToken cancellationToken = default)
+ {
+ if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty)
+ {
+ if (multiple)
+ {
+ foreach (KeyValuePair> pair in _confirmsTaskCompletionSources)
{
- // Done, mark tasks
- foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
+ if (pair.Key <= deliveryTag)
{
- confirmsTaskCompletionSource.TrySetResult(_onlyAcksReceived);
+ pair.Value.SetException(new PublishException(pair.Key, isReturn));
+ _confirmsTaskCompletionSources.Remove(pair.Key, out _);
}
-
- _confirmsTaskCompletionSources.Clear();
- _onlyAcksReceived = true;
}
}
- finally
+ else
{
- _confirmSemaphore.Release();
+ if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource? tcs))
+ {
+ tcs.SetException(new PublishException(deliveryTag, isReturn));
+ }
}
}
+
+ return Task.CompletedTask;
}
- private static BasicProperties? PopulateActivityAndPropagateTraceId(TProperties basicProperties,
- Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader
+ private BasicProperties? PopulateBasicPropertiesHeaders(TProperties basicProperties,
+ Activity? sendActivity, ulong publishSequenceNumber)
+ where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
- // This activity is marked as recorded, so let's propagate the trace and span ids.
- if (sendActivity.IsAllDataRequested)
+ /*
+ * Note: there is nothing to do in this method if *both* of these
+ * conditions are true:
+ *
+ * sendActivity is null - there is no activity to add as a header
+ * publisher confirmations are NOT enabled
+ */
+ if (sendActivity is null && !_publisherConfirmationsEnabled)
{
- if (!string.IsNullOrEmpty(basicProperties.CorrelationId))
- {
- sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, basicProperties.CorrelationId);
- }
+ return null;
}
+ bool newHeaders = false;
IDictionary? headers = basicProperties.Headers;
if (headers is null)
{
- return AddHeaders(basicProperties, sendActivity);
+ headers = new Dictionary();
+ newHeaders = true;
}
+ MaybeAddActivityToHeaders(headers, basicProperties.CorrelationId, sendActivity);
+ MaybeAddPublishSequenceNumberToHeaders(headers);
- // Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
- RabbitMQActivitySource.ContextInjector(sendActivity, headers);
- return null;
+ switch (basicProperties)
+ {
+ case BasicProperties writableProperties:
+ if (newHeaders)
+ {
+ writableProperties.Headers = headers;
+ }
+ return null;
+ case EmptyBasicProperty:
+ return new BasicProperties { Headers = headers };
+ default:
+ return new BasicProperties(basicProperties) { Headers = headers };
+ }
- static BasicProperties? AddHeaders(TProperties basicProperties, Activity sendActivity)
+ void MaybeAddActivityToHeaders(IDictionary headers,
+ string? correlationId, Activity? sendActivity)
{
- var headers = new Dictionary();
+ if (sendActivity is not null)
+ {
+ // This activity is marked as recorded, so let's propagate the trace and span ids.
+ if (sendActivity.IsAllDataRequested)
+ {
+ if (!string.IsNullOrEmpty(correlationId))
+ {
+ sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, correlationId);
+ }
+ }
- // Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
- RabbitMQActivitySource.ContextInjector(sendActivity, headers);
+ // Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
+ RabbitMQActivitySource.ContextInjector(sendActivity, headers);
+ }
+ }
- switch (basicProperties)
+ void MaybeAddPublishSequenceNumberToHeaders(IDictionary headers)
+ {
+ if (_publisherConfirmationsEnabled)
{
- case BasicProperties writableProperties:
- writableProperties.Headers = headers;
- return null;
- case EmptyBasicProperty:
- return new BasicProperties { Headers = headers };
- default:
- return new BasicProperties(basicProperties) { Headers = headers };
+ byte[] publishSequenceNumberBytes = new byte[8];
+ NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber);
+ headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes;
}
}
}
diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs
index 4a733ac8c..373f6deea 100644
--- a/projects/RabbitMQ.Client/Impl/Connection.cs
+++ b/projects/RabbitMQ.Client/Impl/Connection.cs
@@ -264,13 +264,19 @@ await CloseAsync(ea, true,
}
}
- public Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
+ public async Task CreateChannelAsync(CreateChannelOptions? options = default,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();
+
+ options ??= CreateChannelOptions.Default;
ISession session = CreateSession();
- var channel = new Channel(_config, session, consumerDispatchConcurrency);
- return channel.OpenAsync(cancellationToken);
+
+ // TODO channel CreateChannelAsync() to combine ctor and OpenAsync
+ var channel = new Channel(_config, session, options.ConsumerDispatchConcurrency);
+ IChannel ch = await channel.OpenAsync(options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cancellationToken)
+ .ConfigureAwait(false);
+ return ch;
}
internal ISession CreateSession()
diff --git a/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs b/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs
index ab1cefecb..6aefc7bd3 100644
--- a/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs
+++ b/projects/RabbitMQ.Client/Impl/RecoveryAwareChannel.cs
@@ -37,7 +37,7 @@ namespace RabbitMQ.Client.Impl
{
internal sealed class RecoveryAwareChannel : Channel
{
- public RecoveryAwareChannel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
+ public RecoveryAwareChannel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null)
: base(config, session, consumerDispatchConcurrency)
{
ActiveDeliveryTagOffset = 0;
@@ -50,7 +50,6 @@ public RecoveryAwareChannel(ConnectionConfig config, ISession session, ushort co
internal void TakeOver(RecoveryAwareChannel other)
{
base.TakeOver(other);
-
ActiveDeliveryTagOffset = other.ActiveDeliveryTagOffset + other.MaxSeenDeliveryTag;
MaxSeenDeliveryTag = 0;
}
diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt
index 9b4e290f2..e60ddc2b1 100644
--- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt
+++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt
@@ -278,8 +278,6 @@ RabbitMQ.Client.Exceptions.MalformedFrameException.MalformedFrameException(strin
RabbitMQ.Client.Exceptions.MalformedFrameException.MalformedFrameException(string message, bool canShutdownCleanly) -> void
RabbitMQ.Client.Exceptions.OperationInterruptedException
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException() -> void
-RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(string message) -> void
-RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(string message, System.Exception inner) -> void
RabbitMQ.Client.Exceptions.OperationInterruptedException.ShutdownReason.set -> void
RabbitMQ.Client.Exceptions.PacketNotRecognizedException
RabbitMQ.Client.Exceptions.PacketNotRecognizedException.PacketNotRecognizedException(int transportHigh, int transportLow, int serverMajor, int serverMinor) -> void
@@ -299,11 +297,9 @@ RabbitMQ.Client.Exceptions.ProtocolVersionMismatchException.ProtocolVersionMisma
RabbitMQ.Client.Exceptions.ProtocolVersionMismatchException.ServerMajor.get -> int
RabbitMQ.Client.Exceptions.ProtocolVersionMismatchException.ServerMinor.get -> int
RabbitMQ.Client.Exceptions.ProtocolViolationException
-RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException() -> void
RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException(string message) -> void
RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException(string message, System.Exception inner) -> void
RabbitMQ.Client.Exceptions.RabbitMQClientException
-RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException() -> void
RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException(string message) -> void
RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException(string message, System.Exception innerException) -> void
RabbitMQ.Client.Exceptions.SyntaxErrorException
@@ -756,8 +752,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~RabbitMQ.Client.IChannel.TxCommitAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.TxRollbackAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.TxSelectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
-~RabbitMQ.Client.IChannel.WaitForConfirmsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
-~RabbitMQ.Client.IChannel.WaitForConfirmsOrDieAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
@@ -785,7 +779,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, System.Collections.Generic.IDictionary? arguments, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
-static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel! channel, ushort replyCode, string! replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
@@ -819,16 +812,8 @@ RabbitMQ.Client.ICredentialsProvider.GetCredentialsAsync(System.Threading.Cancel
RabbitMQ.Client.ICredentialsProvider.Name.get -> string!
RabbitMQ.Client.PlainMechanism.HandleChallengeAsync(byte[]? challenge, RabbitMQ.Client.ConnectionConfig! config, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
readonly RabbitMQ.Client.ConnectionConfig.CredentialsProvider -> RabbitMQ.Client.ICredentialsProvider!
-RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
-RabbitMQ.Client.IChannel.BasicPublishAsync(string! exchange, string! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
-static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
-static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
-static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
-static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
-RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
-RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
RabbitMQ.Client.IConnection.CallbackExceptionAsync -> RabbitMQ.Client.Events.AsyncEventHandler!
RabbitMQ.Client.IConnection.ConnectionBlockedAsync -> RabbitMQ.Client.Events.AsyncEventHandler!
RabbitMQ.Client.IConnection.ConnectionRecoveryErrorAsync -> RabbitMQ.Client.Events.AsyncEventHandler!
@@ -888,8 +873,6 @@ RabbitMQ.Client.Events.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.Shutd
RabbitMQ.Client.Events.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string! replyText, System.Exception! exception, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void
RabbitMQ.Client.Events.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string! replyText, ushort classId, ushort methodId, object? cause = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void
RabbitMQ.Client.Exceptions.AlreadyClosedException.AlreadyClosedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> void
-RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs? reason) -> void
-RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs? reason, string! prefix) -> void
RabbitMQ.Client.Exceptions.OperationInterruptedException.ShutdownReason.get -> RabbitMQ.Client.Events.ShutdownEventArgs?
RabbitMQ.Client.IAsyncBasicConsumer.HandleChannelShutdownAsync(object! channel, RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> System.Threading.Tasks.Task!
RabbitMQ.Client.IChannel.ChannelShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler!
diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
index e69de29bb..e9f625369 100644
--- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
+++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
@@ -0,0 +1,26 @@
+const RabbitMQ.Client.Constants.PublishSequenceNumberHeader = "x-dotnet-pub-seq-no" -> string!
+RabbitMQ.Client.CreateChannelOptions
+RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort?
+RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void
+RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void
+RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool
+RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void
+RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool
+RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.set -> void
+RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> void
+RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason, string! prefix) -> void
+RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException() -> void
+RabbitMQ.Client.Exceptions.PublishException
+RabbitMQ.Client.Exceptions.PublishException.IsReturn.get -> bool
+RabbitMQ.Client.Exceptions.PublishException.PublishException(ulong publishSequenceNumber, bool isReturn) -> void
+RabbitMQ.Client.Exceptions.PublishException.PublishSequenceNumber.get -> ulong
+RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException() -> void
+RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
+RabbitMQ.Client.IChannel.BasicPublishAsync(string! exchange, string! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
+RabbitMQ.Client.IConnection.CreateChannelAsync(RabbitMQ.Client.CreateChannelOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
+static RabbitMQ.Client.CreateChannelOptions.Default.get -> RabbitMQ.Client.CreateChannelOptions!
+static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
+static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
+static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
+static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
+static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
\ No newline at end of file
diff --git a/projects/Test/Applications/CreateChannel/Program.cs b/projects/Test/Applications/CreateChannel/Program.cs
index 378eb4f3c..69d96e257 100644
--- a/projects/Test/Applications/CreateChannel/Program.cs
+++ b/projects/Test/Applications/CreateChannel/Program.cs
@@ -1,4 +1,35 @@
-using System;
+// This source code is dual-licensed under the Apache License, version
+// 2.0, and the Mozilla Public License, version 2.0.
+//
+// The APL v2.0:
+//
+//---------------------------------------------------------------------------
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//---------------------------------------------------------------------------
+//
+// The MPL v2.0:
+//
+//---------------------------------------------------------------------------
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+//
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//---------------------------------------------------------------------------
+
+using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
@@ -36,7 +67,7 @@ public static async Task Main()
for (int j = 0; j < channels.Length; j++)
{
- channels[j].Dispose();
+ await channels[j].DisposeAsync();
}
}
diff --git a/projects/Test/Applications/GH-1647/Program.cs b/projects/Test/Applications/GH-1647/Program.cs
index c110641de..18ac9e57d 100644
--- a/projects/Test/Applications/GH-1647/Program.cs
+++ b/projects/Test/Applications/GH-1647/Program.cs
@@ -1,4 +1,35 @@
-#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
+// This source code is dual-licensed under the Apache License, version
+// 2.0, and the Mozilla Public License, version 2.0.
+//
+// The APL v2.0:
+//
+//---------------------------------------------------------------------------
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//---------------------------------------------------------------------------
+//
+// The MPL v2.0:
+//
+//---------------------------------------------------------------------------
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+//
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//---------------------------------------------------------------------------
+
+#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
using System.Text;
using RabbitMQ.Client;
@@ -9,6 +40,12 @@
Password = "guest"
};
+var channelOptions = new CreateChannelOptions
+{
+ PublisherConfirmationsEnabled = true,
+ PublisherConfirmationTrackingEnabled = true
+};
+
var props = new BasicProperties();
byte[] msg = Encoding.UTF8.GetBytes("test");
await using var connection = await connectionFactory.CreateConnectionAsync();
@@ -16,11 +53,18 @@
{
try
{
- await using var channel = await connection.CreateChannelAsync(); // New channel for each message
+ await using var channel = await connection.CreateChannelAsync(channelOptions); // New channel for each message
await Task.Delay(1000);
- await channel.BasicPublishAsync(exchange: string.Empty, routingKey: string.Empty,
- mandatory: false, basicProperties: props, body: msg);
- Console.WriteLine($"Sent message {i}");
+ try
+ {
+ await channel.BasicPublishAsync(exchange: string.Empty, routingKey: string.Empty,
+ mandatory: false, basicProperties: props, body: msg);
+ Console.WriteLine($"Sent message {i}");
+ }
+ catch (Exception ex)
+ {
+ Console.Error.WriteLine($"[ERROR] message {i} not acked: {ex}");
+ }
}
catch (Exception ex)
{
diff --git a/projects/Test/Applications/MassPublish/Program.cs b/projects/Test/Applications/MassPublish/Program.cs
index 2d6ef5e63..3e074e3ff 100644
--- a/projects/Test/Applications/MassPublish/Program.cs
+++ b/projects/Test/Applications/MassPublish/Program.cs
@@ -137,20 +137,24 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer
publishTasks.Add(Task.Run(async () =>
{
- using IChannel publishChannel = await publishConnection.CreateChannelAsync();
+ using IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;
- await publishChannel.ConfirmSelectAsync();
-
for (int i = 0; i < ItemsPerBatch; i++)
{
- await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: RoutingKey,
- basicProperties: s_properties, body: s_payload, mandatory: true);
- Interlocked.Increment(ref s_messagesSent);
+ try
+ {
+ await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: RoutingKey,
+ basicProperties: s_properties, body: s_payload, mandatory: true);
+ Interlocked.Increment(ref s_messagesSent);
+ }
+ catch (Exception ex)
+ {
+ Console.Error.WriteLine("[ERROR] channel {0} saw nack, ex: {1}",
+ publishChannel.ChannelNumber, ex);
+ }
}
- await publishChannel.WaitForConfirmsOrDieAsync();
-
if (s_debug)
{
Console.WriteLine("[DEBUG] channel {0} done publishing and waiting for confirms", publishChannel.ChannelNumber);
diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs
index 16b597459..d2439f256 100644
--- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs
+++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs
@@ -1,4 +1,36 @@
-using System;
+// This source code is dual-licensed under the Apache License, version
+// 2.0, and the Mozilla Public License, version 2.0.
+//
+// The APL v2.0:
+//
+//---------------------------------------------------------------------------
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//---------------------------------------------------------------------------
+//
+// The MPL v2.0:
+//
+//---------------------------------------------------------------------------
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+//
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//---------------------------------------------------------------------------
+
+using System;
+using System.Buffers.Binary;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
@@ -9,6 +41,8 @@
const int MESSAGE_COUNT = 50_000;
bool debug = false;
+#pragma warning disable CS8321 // Local function is declared but never used
+
await PublishMessagesIndividuallyAsync();
await PublishMessagesInBatchAsync();
await HandlePublishConfirmsAsynchronously();
@@ -21,15 +55,14 @@ static Task CreateConnectionAsync()
static async Task PublishMessagesIndividuallyAsync()
{
- Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages individually and handling confirms all at once");
+ Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms per-message");
await using IConnection connection = await CreateConnectionAsync();
- await using IChannel channel = await connection.CreateChannelAsync();
+ await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;
- await channel.ConfirmSelectAsync();
var sw = new Stopwatch();
sw.Start();
@@ -37,11 +70,16 @@ static async Task PublishMessagesIndividuallyAsync()
for (int i = 0; i < MESSAGE_COUNT; i++)
{
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
- await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body);
+ try
+ {
+ await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body);
+ }
+ catch (Exception ex)
+ {
+ Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: {ex}");
+ }
}
- await channel.WaitForConfirmsOrDieAsync();
-
sw.Stop();
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages individually in {sw.ElapsedMilliseconds:N0} ms");
@@ -52,41 +90,58 @@ static async Task PublishMessagesInBatchAsync()
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches");
await using IConnection connection = await CreateConnectionAsync();
- await using IChannel channel = await connection.CreateChannelAsync();
+ await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;
- await channel.ConfirmSelectAsync();
- int batchSize = 100;
+ int batchSize = 1000;
int outstandingMessageCount = 0;
var sw = new Stopwatch();
sw.Start();
- var publishTasks = new List();
+ var publishTasks = new List();
for (int i = 0; i < MESSAGE_COUNT; i++)
{
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
- publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body).AsTask());
+ publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body));
outstandingMessageCount++;
if (outstandingMessageCount == batchSize)
{
- using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
+ foreach (ValueTask pt in publishTasks)
+ {
+ try
+ {
+ await pt;
+ }
+ catch (Exception ex)
+ {
+ Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
+ }
+ }
publishTasks.Clear();
-
- await channel.WaitForConfirmsOrDieAsync(cts.Token);
outstandingMessageCount = 0;
}
}
- if (outstandingMessageCount > 0)
+ if (publishTasks.Count > 0)
{
- using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
- await channel.WaitForConfirmsOrDieAsync(cts.Token);
+ foreach (ValueTask pt in publishTasks)
+ {
+ try
+ {
+ await pt;
+ }
+ catch (Exception ex)
+ {
+ Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
+ }
+ }
+ publishTasks.Clear();
+ outstandingMessageCount = 0;
}
sw.Stop();
@@ -98,20 +153,22 @@ async Task HandlePublishConfirmsAsynchronously()
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously");
await using IConnection connection = await CreateConnectionAsync();
- await using IChannel channel = await connection.CreateChannelAsync();
+
+ var channelOptions = new CreateChannelOptions
+ {
+ PublisherConfirmationsEnabled = true,
+ PublisherConfirmationTrackingEnabled = false
+ };
+ await using IChannel channel = await connection.CreateChannelAsync(channelOptions);
// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;
- // NOTE: setting trackConfirmations to false because this program
- // is tracking them itself.
- await channel.ConfirmSelectAsync(trackConfirmations: false);
-
- bool publishingCompleted = false;
var allMessagesConfirmedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var outstandingConfirms = new LinkedList();
var semaphore = new SemaphoreSlim(1, 1);
+ int confirmedCount = 0;
async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
{
if (debug)
@@ -140,10 +197,13 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
{
break;
}
+
+ confirmedCount++;
} while (true);
}
else
{
+ confirmedCount++;
outstandingConfirms.Remove(deliveryTag);
}
}
@@ -152,12 +212,30 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
semaphore.Release();
}
- if (publishingCompleted && outstandingConfirms.Count == 0)
+ if (outstandingConfirms.Count == 0 || confirmedCount == MESSAGE_COUNT)
{
allMessagesConfirmedTcs.SetResult(true);
}
}
+ channel.BasicReturnAsync += (sender, ea) =>
+ {
+ ulong sequenceNumber = 0;
+
+ IReadOnlyBasicProperties props = ea.BasicProperties;
+ if (props.Headers is not null)
+ {
+ object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader];
+ if (maybeSeqNum is not null)
+ {
+ sequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum);
+ }
+ }
+
+ Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number {sequenceNumber} has been basic.return-ed");
+ return CleanOutstandingConfirms(sequenceNumber, false);
+ };
+
channel.BasicAcksAsync += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacksAsync += (sender, ea) =>
{
@@ -168,7 +246,7 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
var sw = new Stopwatch();
sw.Start();
- var publishTasks = new List();
+ var publishTasks = new List>();
for (int i = 0; i < MESSAGE_COUNT; i++)
{
string msg = i.ToString();
@@ -187,12 +265,31 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
{
semaphore.Release();
}
- publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body).AsTask());
+
+ string rk = queueName;
+ if (i % 1000 == 0)
+ {
+ // This will cause a basic.return, for fun
+ rk = Guid.NewGuid().ToString();
+ }
+ (ulong, ValueTask) data =
+ (nextPublishSeqNo, channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true));
+ publishTasks.Add(data);
}
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
- await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
- publishingCompleted = true;
+ // await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
+ foreach ((ulong SeqNo, ValueTask PublishTask) datum in publishTasks)
+ {
+ try
+ {
+ await datum.PublishTask;
+ }
+ catch (Exception ex)
+ {
+ Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack, seqNo: '{datum.SeqNo}', ex: '{ex}'");
+ }
+ }
try
{
diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj
index fd865eccf..9e089be82 100644
--- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj
+++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj
@@ -13,7 +13,7 @@
-
+
diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs
index c6d6265d3..70fa16adb 100644
--- a/projects/Test/Common/IntegrationFixture.cs
+++ b/projects/Test/Common/IntegrationFixture.cs
@@ -153,7 +153,7 @@ public virtual async Task InitializeAsync()
if (_openChannel)
{
- _channel = await _conn.CreateChannelAsync();
+ _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
}
if (IsVerbose)
@@ -190,8 +190,14 @@ public virtual async Task DisposeAsync()
finally
{
_eventListener?.Dispose();
- _channel?.Dispose();
- _conn?.Dispose();
+ if (_channel is not null)
+ {
+ await _channel.DisposeAsync();
+ }
+ if (_conn is not null)
+ {
+ await _conn.DisposeAsync();
+ }
_channel = null;
_conn = null;
}
diff --git a/projects/Test/Common/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs
index 8b2e7edc3..bbd65519f 100644
--- a/projects/Test/Common/TestConnectionRecoveryBase.cs
+++ b/projects/Test/Common/TestConnectionRecoveryBase.cs
@@ -71,17 +71,14 @@ protected async Task AssertConsumerCountAsync(IChannel ch, string q, uint count)
Assert.Equal(count, ok.ConsumerCount);
}
- protected async Task AssertExchangeRecoveryAsync(IChannel m, string x)
+ protected async Task AssertExchangeRecoveryAsync(IChannel ch, string x)
{
- await m.ConfirmSelectAsync();
- await WithTemporaryNonExclusiveQueueAsync(m, async (_, q) =>
+ await WithTemporaryNonExclusiveQueueAsync(ch, async (_, q) =>
{
string rk = "routing-key";
- await m.QueueBindAsync(q, x, rk);
- await m.BasicPublishAsync(x, rk, _messageBody);
-
- Assert.True(await WaitForConfirmsWithCancellationAsync(m));
- await m.ExchangeDeclarePassiveAsync(x);
+ await ch.QueueBindAsync(q, x, rk);
+ await ch.BasicPublishAsync(x, rk, _messageBody);
+ await ch.ExchangeDeclarePassiveAsync(x);
});
}
@@ -92,15 +89,13 @@ protected Task AssertExclusiveQueueRecoveryAsync(IChannel m, string q)
protected async Task AssertQueueRecoveryAsync(IChannel ch, string q, bool exclusive, IDictionary arguments = null)
{
- await ch.ConfirmSelectAsync();
- await ch.QueueDeclareAsync(queue: q, passive: true, durable: false, exclusive: false, autoDelete: false, arguments: null);
+ await ch.QueueDeclarePassiveAsync(q);
RabbitMQ.Client.QueueDeclareOk ok1 = await ch.QueueDeclareAsync(queue: q, passive: false,
durable: false, exclusive: exclusive, autoDelete: false, arguments: arguments);
Assert.Equal(0u, ok1.MessageCount);
await ch.BasicPublishAsync("", q, _messageBody);
- Assert.True(await WaitForConfirmsWithCancellationAsync(ch));
RabbitMQ.Client.QueueDeclareOk ok2 = await ch.QueueDeclareAsync(queue: q, passive: false,
durable: false, exclusive: exclusive, autoDelete: false, arguments: arguments);
@@ -204,10 +199,8 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName)
{
using (AutorecoveringConnection publishingConn = await CreateAutorecoveringConnectionAsync())
{
- using (IChannel publishingChannel = await publishingConn.CreateChannelAsync())
+ using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
{
- await publishingChannel.ConfirmSelectAsync();
-
for (ushort i = 0; i < TotalMessageCount; i++)
{
if (i == CloseAtCount)
@@ -216,7 +209,6 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName)
}
await publishingChannel.BasicPublishAsync(string.Empty, queueName, _messageBody);
- await publishingChannel.WaitForConfirmsOrDieAsync();
}
await publishingChannel.CloseAsync();
@@ -238,14 +230,6 @@ protected static TaskCompletionSource PrepareForShutdown(IConnection conn)
return tcs;
}
- protected static Task WaitForConfirmsWithCancellationAsync(IChannel channel)
- {
- using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4)))
- {
- return channel.WaitForConfirmsAsync(cts.Token);
- }
- }
-
protected Task WaitForShutdownAsync()
{
TaskCompletionSource tcs = PrepareForShutdown(_conn);
@@ -358,10 +342,8 @@ public virtual Task PostHandleDeliveryAsync(ulong deliveryTag,
protected static async Task SendAndConsumeMessageAsync(IConnection conn, string queue, string exchange, string routingKey)
{
- using (IChannel ch = await conn.CreateChannelAsync())
+ using (IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
{
- await ch.ConfirmSelectAsync();
-
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var consumer = new AckingBasicConsumer(ch, 1, tcs);
@@ -371,8 +353,6 @@ protected static async Task SendAndConsumeMessageAsync(IConnection conn, s
await ch.BasicPublishAsync(exchange: exchange, routingKey: routingKey,
body: _encoding.GetBytes("test message"), mandatory: true);
- await ch.WaitForConfirmsOrDieAsync();
-
try
{
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
diff --git a/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs b/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs
index c3da0f291..b45c7271e 100644
--- a/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs
+++ b/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs
@@ -154,7 +154,6 @@ public async Task TestBasicAckAfterBasicGetAndChannelRecovery()
[Fact]
public async Task TestBasicAckEventHandlerRecovery()
{
- await _channel.ConfirmSelectAsync();
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
((AutorecoveringChannel)_channel).BasicAcksAsync += (m, args) =>
{
diff --git a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs
index 266144a04..3fcb978b2 100644
--- a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs
+++ b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs
@@ -75,7 +75,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,
await _channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: routingKey);
await _channel.CloseAsync();
- _channel.Dispose();
+ await _channel.DisposeAsync();
_channel = null;
_channel = await _conn.CreateChannelAsync();
@@ -96,7 +96,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,
consumer.ReceivedAsync += MessageReceived;
await _channel.BasicConsumeAsync(queueName, true, consumer);
- await using (IChannel pubCh = await _conn.CreateChannelAsync())
+ await using (IChannel pubCh = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
{
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body);
await pubCh.CloseAsync();
@@ -106,7 +106,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,
await CloseAndWaitForRecoveryAsync();
- await using (IChannel pubCh = await _conn.CreateChannelAsync())
+ await using (IChannel pubCh = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
{
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: "unused", body: body);
await pubCh.CloseAsync();
diff --git a/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs
index 0cdfe8e17..a2aac17c3 100644
--- a/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs
+++ b/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs
@@ -55,8 +55,6 @@ public async Task TestExchangeRecoveryTest()
[Fact]
public async Task TestExchangeToExchangeBindingRecovery()
{
- await _channel.ConfirmSelectAsync();
-
string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName;
string ex_source = GenerateExchangeName();
@@ -73,7 +71,6 @@ public async Task TestExchangeToExchangeBindingRecovery()
await CloseAndWaitForRecoveryAsync();
Assert.True(_channel.IsOpen);
await _channel.BasicPublishAsync(ex_source, "", body: _encoding.GetBytes("msg"), mandatory: true);
- await _channel.WaitForConfirmsOrDieAsync();
await AssertMessageCountAsync(q, 1);
}
finally
diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs
index f6b17f3ba..773238ff6 100644
--- a/projects/Test/Integration/TestAsyncConsumer.cs
+++ b/projects/Test/Integration/TestAsyncConsumer.cs
@@ -213,7 +213,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
});
return Task.CompletedTask;
};
- await using (IChannel publishChannel = await publishConn.CreateChannelAsync())
+ await using (IChannel publishChannel = await publishConn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
{
AddCallbackExceptionHandlers(publishConn, publishChannel);
publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel,
@@ -226,15 +226,15 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
});
return Task.CompletedTask;
};
- await publishChannel.ConfirmSelectAsync();
+ var publishTasks = new List();
for (int i = 0; i < publish_total; i++)
{
- await publishChannel.BasicPublishAsync(string.Empty, queueName, body1);
- await publishChannel.BasicPublishAsync(string.Empty, queueName, body2);
- await publishChannel.WaitForConfirmsOrDieAsync();
+ publishTasks.Add(publishChannel.BasicPublishAsync(string.Empty, queueName, body1).AsTask());
+ publishTasks.Add(publishChannel.BasicPublishAsync(string.Empty, queueName, body2).AsTask());
}
+ await Task.WhenAll(publishTasks).WaitAsync(WaitSpan);
await publishChannel.CloseAsync();
}
@@ -463,8 +463,6 @@ public async Task TestBasicAckAsync()
return Task.CompletedTask;
};
- await _channel.ConfirmSelectAsync();
-
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs args) =>
{
@@ -491,7 +489,6 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
{
byte[] _body = _encoding.GetBytes(Guid.NewGuid().ToString());
await _channel.BasicPublishAsync(string.Empty, queueName, _body);
- await _channel.WaitForConfirmsOrDieAsync();
}
return true;
@@ -649,12 +646,10 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
var consumer1 = new AsyncEventingBasicConsumer(_channel);
consumer1.ReceivedAsync += async (sender, args) =>
{
- await using IChannel innerChannel = await _conn.CreateChannelAsync();
- await innerChannel.ConfirmSelectAsync();
+ await using IChannel innerChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
mandatory: true,
body: Encoding.ASCII.GetBytes(nameof(TestCreateChannelWithinAsyncConsumerCallback_GH650)));
- await innerChannel.WaitForConfirmsOrDieAsync();
await innerChannel.CloseAsync();
};
await _channel.BasicConsumeAsync(queue1Name, autoAck: true, consumer1);
@@ -667,9 +662,7 @@ await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
};
await _channel.BasicConsumeAsync(queue2Name, autoAck: true, consumer2);
- await _channel.ConfirmSelectAsync();
await _channel.BasicPublishAsync(exchangeName, queue1Name, body: GetRandomBody(1024));
- await _channel.WaitForConfirmsOrDieAsync();
Assert.True(await tcs.Task);
}
@@ -689,9 +682,9 @@ public async Task TestCloseWithinEventHandler_GH1567()
{
await _channel.BasicCancelAsync(eventArgs.ConsumerTag);
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
- _channel.CloseAsync().ContinueWith((_) =>
+ _channel.CloseAsync().ContinueWith(async (_) =>
{
- _channel.Dispose();
+ await _channel.DisposeAsync();
_channel = null;
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
@@ -715,7 +708,7 @@ private async Task ValidateConsumerDispatchConcurrency()
Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
await using IChannel ch = await _conn.CreateChannelAsync(
- consumerDispatchConcurrency: expectedConsumerDispatchConcurrency);
+ new CreateChannelOptions { ConsumerDispatchConcurrency = expectedConsumerDispatchConcurrency });
AutorecoveringChannel ach = (AutorecoveringChannel)ch;
Assert.Equal(expectedConsumerDispatchConcurrency, ach.ConsumerDispatcher.Concurrency);
}
diff --git a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs
index 5cc864888..cf3c59d31 100644
--- a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs
+++ b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs
@@ -105,7 +105,7 @@ public async Task TestAsyncEventingBasicConsumer_GH1038()
await _channel.BasicConsumeAsync(queueName, false, consumer);
//publisher
- await using IChannel publisherChannel = await _conn.CreateChannelAsync();
+ await using IChannel publisherChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty,
diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs
index 3cbf98ca1..107460fd8 100644
--- a/projects/Test/Integration/TestBasicPublish.cs
+++ b/projects/Test/Integration/TestBasicPublish.cs
@@ -35,6 +35,7 @@
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
+using RabbitMQ.Client.Exceptions;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;
@@ -59,7 +60,7 @@ public override Task InitializeAsync()
public async Task TestBasicRoundtripArray()
{
_conn = await _connFactory.CreateConnectionAsync();
- _channel = await _conn.CreateChannelAsync();
+ _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
QueueDeclareOk q = await _channel.QueueDeclareAsync();
var bp = new BasicProperties();
@@ -87,7 +88,7 @@ public async Task TestBasicRoundtripArray()
public async Task TestBasicRoundtripCachedString()
{
_conn = await _connFactory.CreateConnectionAsync();
- _channel = await _conn.CreateChannelAsync();
+ _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
CachedString exchangeName = new CachedString(string.Empty);
CachedString queueName = new CachedString((await _channel.QueueDeclareAsync()).QueueName);
@@ -115,7 +116,7 @@ public async Task TestBasicRoundtripCachedString()
public async Task TestBasicRoundtripReadOnlyMemory()
{
_conn = await _connFactory.CreateConnectionAsync();
- _channel = await _conn.CreateChannelAsync();
+ _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
QueueDeclareOk q = await _channel.QueueDeclareAsync();
byte[] sendBody = _encoding.GetBytes("hi");
@@ -142,7 +143,7 @@ public async Task TestBasicRoundtripReadOnlyMemory()
public async Task CanNotModifyPayloadAfterPublish()
{
_conn = await _connFactory.CreateConnectionAsync();
- _channel = await _conn.CreateChannelAsync();
+ _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
QueueDeclareOk q = await _channel.QueueDeclareAsync();
byte[] sendBody = new byte[1000];
@@ -203,7 +204,7 @@ public async Task TestMaxInboundMessageBodySize()
Assert.Equal(maxMsgSize, cf.Endpoint.MaxInboundMessageBodySize);
Assert.Equal(maxMsgSize, conn.Endpoint.MaxInboundMessageBodySize);
- await using (IChannel channel = await conn.CreateChannelAsync())
+ await using (IChannel channel = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
{
channel.ChannelShutdownAsync += (o, a) =>
{
@@ -247,7 +248,9 @@ public async Task TestMaxInboundMessageBodySize()
string tag = await channel.BasicConsumeAsync(q.QueueName, true, consumer);
await channel.BasicPublishAsync("", q.QueueName, msg0);
- await channel.BasicPublishAsync("", q.QueueName, msg1);
+ AlreadyClosedException ex = await Assert.ThrowsAsync(() =>
+ channel.BasicPublishAsync("", q.QueueName, msg1).AsTask());
+ Assert.IsType(ex.InnerException);
Assert.True(await tcs.Task);
Assert.Equal(1, count);
@@ -259,6 +262,7 @@ public async Task TestMaxInboundMessageBodySize()
try
{
await channel.CloseAsync();
+ await channel.DisposeAsync();
}
catch (Exception chex)
{
@@ -272,6 +276,7 @@ public async Task TestMaxInboundMessageBodySize()
try
{
await conn.CloseAsync();
+ await conn.DisposeAsync();
}
catch (Exception connex)
{
@@ -286,7 +291,7 @@ public async Task TestMaxInboundMessageBodySize()
public async Task TestPropertiesRoundtrip_Headers()
{
_conn = await _connFactory.CreateConnectionAsync();
- _channel = await _conn.CreateChannelAsync();
+ _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
var subject = new BasicProperties
{
diff --git a/projects/Test/Integration/TestBasicPublishAsync.cs b/projects/Test/Integration/TestBasicPublishAsync.cs
index ad4650e2a..072e2a632 100644
--- a/projects/Test/Integration/TestBasicPublishAsync.cs
+++ b/projects/Test/Integration/TestBasicPublishAsync.cs
@@ -49,7 +49,6 @@ public async Task TestQueuePurgeAsync()
var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- await _channel.ConfirmSelectAsync();
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true);
@@ -60,7 +59,6 @@ public async Task TestQueuePurgeAsync()
{
await _channel.BasicPublishAsync(string.Empty, q, body);
}
- await _channel.WaitForConfirmsOrDieAsync();
publishSyncSource.SetResult(true);
});
diff --git a/projects/Test/Integration/TestChannelAllocation.cs b/projects/Test/Integration/TestChannelAllocation.cs
index 42b495c49..ddd43b487 100644
--- a/projects/Test/Integration/TestChannelAllocation.cs
+++ b/projects/Test/Integration/TestChannelAllocation.cs
@@ -73,7 +73,7 @@ public async Task AllocateInOrder()
foreach (IChannel channel in channels)
{
await channel.CloseAsync();
- channel.Dispose();
+ await channel.DisposeAsync();
}
}
@@ -90,7 +90,7 @@ public async Task AllocateInOrderOnlyUsingDispose()
foreach (IChannel channel in channels)
{
- channel.Dispose();
+ await channel.DisposeAsync();
}
}
@@ -100,12 +100,12 @@ public async Task AllocateAfterFreeingLast()
IChannel ch0 = await _c.CreateChannelAsync();
Assert.Equal(1, ChannelNumber(ch0));
await ch0.CloseAsync();
- ch0.Dispose();
+ await ch0.DisposeAsync();
IChannel ch1 = await _c.CreateChannelAsync();
Assert.Equal(1, ChannelNumber(ch1));
await ch1.CloseAsync();
- ch1.Dispose();
+ await ch1.DisposeAsync();
}
[Fact]
@@ -121,6 +121,7 @@ public async Task AllocateAfterFreeingMany()
foreach (IChannel channel in channels)
{
await channel.CloseAsync();
+ await channel.DisposeAsync();
}
channels.Clear();
@@ -139,6 +140,7 @@ public async Task AllocateAfterFreeingMany()
{
Assert.Equal(k++, ChannelNumber(channel));
await channel.CloseAsync();
+ await channel.DisposeAsync();
}
}
diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs
index 26f83eb34..71fd7bfaf 100644
--- a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs
+++ b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs
@@ -37,6 +37,7 @@
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
+using RabbitMQ.Client.Exceptions;
using Xunit;
using Xunit.Abstractions;
@@ -54,25 +55,21 @@ public TestConcurrentAccessWithSharedChannel(ITestOutputHelper output)
[Fact]
public async Task ConcurrentPublishSingleChannel()
{
- int publishAckCount = 0;
+ int expectedTotalMessageCount = 0;
+ int expectedTotalReturnCount = 0;
+ int totalNackCount = 0;
+ int totalReturnCount = 0;
+ int totalReceivedCount = 0;
_channel.BasicAcksAsync += (object sender, BasicAckEventArgs e) =>
{
- Interlocked.Increment(ref publishAckCount);
return Task.CompletedTask;
};
- _channel.BasicNacksAsync += (object sender, BasicNackEventArgs e) =>
- {
- _output.WriteLine($"channel #{_channel.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}");
- return Task.CompletedTask;
- };
-
- await _channel.ConfirmSelectAsync(trackConfirmations: false);
-
await TestConcurrentOperationsAsync(async () =>
{
- long receivedCount = 0;
+ long thisBatchReceivedCount = 0;
+ long thisBatchReturnedCount = 0;
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var msgTracker = new ConcurrentDictionary();
@@ -84,6 +81,7 @@ await TestConcurrentOperationsAsync(async () =>
consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs ea) =>
{
+ Interlocked.Increment(ref totalReceivedCount);
try
{
System.Diagnostics.Debug.Assert(object.ReferenceEquals(sender, consumer));
@@ -95,7 +93,9 @@ await TestConcurrentOperationsAsync(async () =>
IChannel ch = cons.Channel;
await ch.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
- if (Interlocked.Increment(ref receivedCount) == _messageCount)
+ long receivedCountSoFar = Interlocked.Increment(ref thisBatchReceivedCount);
+
+ if ((receivedCountSoFar + thisBatchReturnedCount) == _messageCount)
{
if (msgTracker.Values.Any(v => v == false))
{
@@ -118,18 +118,59 @@ await TestConcurrentOperationsAsync(async () =>
var publishTasks = new List();
for (ushort i = 0; i < _messageCount; i++)
{
- msgTracker[i] = false;
+ Interlocked.Increment(ref expectedTotalMessageCount);
+
byte[] body = _encoding.GetBytes(i.ToString());
- publishTasks.Add(_channel.BasicPublishAsync("", q.QueueName, mandatory: true, body: body));
+
+ string routingKey = q.QueueName;
+ if (i % 5 == 0)
+ {
+ routingKey = Guid.NewGuid().ToString();
+ Interlocked.Increment(ref thisBatchReturnedCount);
+ Interlocked.Increment(ref expectedTotalReturnCount);
+ }
+ else
+ {
+ msgTracker[i] = false;
+ }
+
+ publishTasks.Add(_channel.BasicPublishAsync("", routingKey, mandatory: true, body: body));
}
foreach (ValueTask pt in publishTasks)
{
- await pt;
+ try
+ {
+ await pt;
+ }
+ catch (PublishException ex)
+ {
+ if (ex.IsReturn)
+ {
+ Interlocked.Increment(ref totalReturnCount);
+ }
+ else
+ {
+ Interlocked.Increment(ref totalNackCount);
+ }
+ }
}
Assert.True(await tcs.Task);
}, Iterations);
+
+ if (IsVerbose)
+ {
+ _output.WriteLine("expectedTotalMessageCount: {0}", expectedTotalMessageCount);
+ _output.WriteLine("expectedTotalReturnCount: {0}", expectedTotalReturnCount);
+ _output.WriteLine("totalReceivedCount: {0}", totalReceivedCount);
+ _output.WriteLine("totalReturnCount: {0}", totalReturnCount);
+ _output.WriteLine("totalNackCount: {0}", totalNackCount);
+ }
+
+ Assert.Equal(expectedTotalReturnCount, totalReturnCount);
+ Assert.Equal(expectedTotalMessageCount, (totalReceivedCount + totalReturnCount));
+ Assert.Equal(0, totalNackCount);
}
}
}
diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs
index bea692094..b1febe2a2 100644
--- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs
+++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs
@@ -108,7 +108,7 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in
try
{
- await using IChannel ch = await _conn.CreateChannelAsync();
+ await using IChannel ch = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true });
ch.ChannelShutdownAsync += (o, ea) =>
{
HandleChannelShutdown(ch, ea, (args) =>
@@ -121,7 +121,6 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in
return Task.CompletedTask;
};
- await ch.ConfirmSelectAsync(trackConfirmations: false);
ch.BasicAcksAsync += (object sender, BasicAckEventArgs e) =>
{
diff --git a/projects/Test/Integration/TestConfirmSelect.cs b/projects/Test/Integration/TestConfirmSelect.cs
index d2b9a38a5..3db7e37a1 100644
--- a/projects/Test/Integration/TestConfirmSelect.cs
+++ b/projects/Test/Integration/TestConfirmSelect.cs
@@ -52,14 +52,12 @@ ValueTask PublishAsync()
routingKey: Guid.NewGuid().ToString(), _encoding.GetBytes("message"));
}
- await _channel.ConfirmSelectAsync();
Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync());
await PublishAsync();
Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync());
await PublishAsync();
Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync());
- await _channel.ConfirmSelectAsync();
await PublishAsync();
Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync());
await PublishAsync();
@@ -77,13 +75,11 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength)
await _channel.ExchangeDeclareAsync("sample", "fanout", autoDelete: true);
// _channel.BasicAcks += (s, e) => _output.WriteLine("Acked {0}", e.DeliveryTag);
- await _channel.ConfirmSelectAsync();
var properties = new BasicProperties();
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
mandatory: false, basicProperties: properties, body: body);
- await _channel.WaitForConfirmsOrDieAsync();
try
{
@@ -93,7 +89,6 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
};
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body);
- await _channel.WaitForConfirmsOrDieAsync();
}
catch
{
@@ -103,7 +98,6 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
properties = new BasicProperties();
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body);
- await _channel.WaitForConfirmsOrDieAsync();
// _output.WriteLine("I'm done...");
}
}
diff --git a/projects/Test/Integration/TestConfirmSelectAsync.cs b/projects/Test/Integration/TestConfirmSelectAsync.cs
index 0681ef05a..0502185e2 100644
--- a/projects/Test/Integration/TestConfirmSelectAsync.cs
+++ b/projects/Test/Integration/TestConfirmSelectAsync.cs
@@ -48,23 +48,21 @@ public TestConfirmSelectAsync(ITestOutputHelper output) : base(output)
[Fact]
public async Task TestConfirmSelectIdempotency()
{
- await _channel.ConfirmSelectAsync();
Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync());
- await Publish();
+ await PublishAsync();
Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync());
- await Publish();
+ await PublishAsync();
Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync());
- await _channel.ConfirmSelectAsync();
- await Publish();
+ await PublishAsync();
Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync());
- await Publish();
+ await PublishAsync();
Assert.Equal(5ul, await _channel.GetNextPublishSequenceNumberAsync());
- await Publish();
+ await PublishAsync();
Assert.Equal(6ul, await _channel.GetNextPublishSequenceNumberAsync());
}
- private ValueTask Publish()
+ private ValueTask PublishAsync()
{
return _channel.BasicPublishAsync(exchange: "",
routingKey: Guid.NewGuid().ToString(), _message);
diff --git a/projects/Test/Integration/TestConnectionFactory.cs b/projects/Test/Integration/TestConnectionFactory.cs
index 46deccdbe..dd5a5cd23 100644
--- a/projects/Test/Integration/TestConnectionFactory.cs
+++ b/projects/Test/Integration/TestConnectionFactory.cs
@@ -241,7 +241,7 @@ public async Task TestCreateConnectionWithoutAutoRecoverySelectsAHostFromTheList
IConnection conn = await cf.CreateConnectionAsync(new List { "localhost" });
await conn.CloseAsync();
- conn.Dispose();
+ await conn.DisposeAsync();
Assert.Equal("not_localhost", cf.HostName);
Assert.Equal("localhost", conn.Endpoint.HostName);
}
diff --git a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs
index 8addefa19..ed5bc466c 100644
--- a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs
+++ b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs
@@ -289,10 +289,8 @@ public async Task TestTopologyRecoveryConsumerFilter()
return Task.CompletedTask;
};
- await using (IChannel ch = await conn.CreateChannelAsync())
+ await using (IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
{
- await ch.ConfirmSelectAsync();
-
await ch.ExchangeDeclareAsync(exchange, "direct");
await ch.QueueDeclareAsync(queueWithRecoveredConsumer, false, false, false);
await ch.QueueDeclareAsync(queueWithIgnoredConsumer, false, false, false);
@@ -327,7 +325,6 @@ public async Task TestTopologyRecoveryConsumerFilter()
Assert.True(ch.IsOpen);
await ch.BasicPublishAsync(exchange, binding1, _encoding.GetBytes("test message"));
await ch.BasicPublishAsync(exchange, binding2, _encoding.GetBytes("test message"));
- await WaitForConfirmsWithCancellationAsync(ch);
await consumerRecoveryTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.True(await consumerRecoveryTcs.Task);
diff --git a/projects/Test/Integration/TestConnectionShutdown.cs b/projects/Test/Integration/TestConnectionShutdown.cs
index daa26af1a..78abffd0a 100644
--- a/projects/Test/Integration/TestConnectionShutdown.cs
+++ b/projects/Test/Integration/TestConnectionShutdown.cs
@@ -107,7 +107,7 @@ public async Task TestDisposedWithSocketClosedOutOfBand()
try
{
- _conn.Dispose();
+ await _conn.DisposeAsync();
await WaitAsync(tcs, WaitSpan, "channel shutdown");
await frameHandlerCloseTask.AsTask().WaitAsync(WaitSpan);
}
@@ -145,7 +145,7 @@ public async Task TestShutdownSignalPropagationToChannelsUsingDispose()
return Task.CompletedTask;
};
- _conn.Dispose();
+ await _conn.DisposeAsync();
_conn = null;
await WaitAsync(tcs, TimeSpan.FromSeconds(3), "channel shutdown");
@@ -172,7 +172,7 @@ public async Task TestConsumerDispatcherShutdown()
public async Task TestDisposeAfterAbort_GH825()
{
await _channel.AbortAsync();
- _channel.Dispose();
+ await _channel.DisposeAsync();
}
}
}
diff --git a/projects/Test/Integration/TestConnectionTopologyRecovery.cs b/projects/Test/Integration/TestConnectionTopologyRecovery.cs
index ffe7d7847..c05032190 100644
--- a/projects/Test/Integration/TestConnectionTopologyRecovery.cs
+++ b/projects/Test/Integration/TestConnectionTopologyRecovery.cs
@@ -104,7 +104,7 @@ public async Task TestTopologyRecoveryQueueFilter()
tcs.SetResult(true);
return Task.CompletedTask;
};
- IChannel ch = await conn.CreateChannelAsync();
+ IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
await ch.QueueDeclareAsync(queueToRecover, false, false, false);
await ch.QueueDeclareAsync(queueToIgnore, false, false, false);
@@ -131,8 +131,8 @@ public async Task TestTopologyRecoveryQueueFilter()
{
await ch.CloseAsync();
await conn.CloseAsync();
- ch.Dispose();
- conn.Dispose();
+ await ch.DisposeAsync();
+ await conn.DisposeAsync();
}
}
@@ -155,7 +155,7 @@ public async Task TestTopologyRecoveryExchangeFilter()
tcs.SetResult(true);
return Task.CompletedTask;
};
- IChannel ch = await conn.CreateChannelAsync();
+ IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
try
{
await ch.ExchangeDeclareAsync(exchangeToRecover, "topic", false, true);
@@ -180,8 +180,8 @@ public async Task TestTopologyRecoveryExchangeFilter()
{
await ch.CloseAsync();
await conn.CloseAsync();
- ch.Dispose();
- conn.Dispose();
+ await ch.DisposeAsync();
+ await conn.DisposeAsync();
}
}
@@ -228,7 +228,9 @@ public async Task TestTopologyRecoveryBindingFilter()
Assert.True(ch.IsOpen);
Assert.True(await SendAndConsumeMessageAsync(_conn, queueWithRecoveredBinding, exchange, bindingToRecover));
- Assert.False(await SendAndConsumeMessageAsync(_conn, queueWithIgnoredBinding, exchange, bindingToIgnore));
+ PublishException ex = await Assert.ThrowsAnyAsync(() =>
+ SendAndConsumeMessageAsync(_conn, queueWithIgnoredBinding, exchange, bindingToIgnore));
+ Assert.Equal((ulong)1, ex.PublishSequenceNumber);
}
finally
{
@@ -237,8 +239,8 @@ public async Task TestTopologyRecoveryBindingFilter()
await ch.QueueDeleteAsync(queueWithIgnoredBinding);
await ch.CloseAsync();
await conn.CloseAsync();
- ch.Dispose();
- conn.Dispose();
+ await ch.DisposeAsync();
+ await conn.DisposeAsync();
}
}
@@ -270,10 +272,9 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities()
return Task.CompletedTask;
};
- IChannel ch = await conn.CreateChannelAsync();
+ IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
try
{
- await ch.ConfirmSelectAsync();
await ch.ExchangeDeclareAsync(exchange, "direct");
await ch.QueueDeclareAsync(queue1, false, false, false);
@@ -315,7 +316,6 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities()
var pt1 = ch.BasicPublishAsync(exchange, binding1, true, _encoding.GetBytes("test message"));
var pt2 = ch.BasicPublishAsync(exchange, binding2, true, _encoding.GetBytes("test message"));
- await WaitForConfirmsWithCancellationAsync(ch);
await Task.WhenAll(pt1.AsTask(), pt2.AsTask()).WaitAsync(WaitSpan);
await Task.WhenAll(consumerReceivedTcs1.Task, consumerReceivedTcs2.Task).WaitAsync(WaitSpan);
@@ -326,8 +326,8 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities()
{
await ch.CloseAsync();
await conn.CloseAsync();
- ch.Dispose();
- conn.Dispose();
+ await ch.DisposeAsync();
+ await conn.DisposeAsync();
}
}
@@ -367,7 +367,7 @@ await channel.QueueDeclareAsync(rq.Name, false, false, false,
tcs.SetResult(true);
return Task.CompletedTask;
};
- IChannel ch = await conn.CreateChannelAsync();
+ IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
await ch.QueueDeclareAsync(queueToRecoverWithException, false, false, false);
await ch.QueueDeclareAsync(queueToRecoverSuccessfully, false, false, false);
@@ -391,8 +391,8 @@ await _channel.QueueDeclareAsync(queueToRecoverWithException, false, false, fals
await _channel.QueueDeleteAsync(queueToRecoverWithException);
await ch.CloseAsync();
await conn.CloseAsync();
- ch.Dispose();
- conn.Dispose();
+ await ch.DisposeAsync();
+ await conn.DisposeAsync();
}
}
@@ -450,8 +450,8 @@ public async Task TestTopologyRecoveryExchangeExceptionHandler()
await ch.CloseAsync();
await conn.CloseAsync();
- ch.Dispose();
- conn.Dispose();
+ await ch.DisposeAsync();
+ await conn.DisposeAsync();
}
}
@@ -514,8 +514,8 @@ public async Task TestTopologyRecoveryBindingExceptionHandler()
await ch.CloseAsync();
await conn.CloseAsync();
- ch.Dispose();
- conn.Dispose();
+ await ch.DisposeAsync();
+ await conn.DisposeAsync();
}
[Fact]
@@ -554,7 +554,8 @@ public async Task TestTopologyRecoveryConsumerExceptionHandler()
IChannel ch = await conn.CreateChannelAsync();
try
{
- await ch.ConfirmSelectAsync();
+ // Note: no need to enable publisher confirmations as they are
+ // automatically enabled for channels
await _channel.QueueDeclareAsync(queueWithExceptionConsumer, false, false, false);
await _channel.QueuePurgeAsync(queueWithExceptionConsumer);
@@ -590,8 +591,8 @@ public async Task TestTopologyRecoveryConsumerExceptionHandler()
{
await ch.CloseAsync();
await conn.CloseAsync();
- ch.Dispose();
- conn.Dispose();
+ await ch.DisposeAsync();
+ await conn.DisposeAsync();
}
}
}
diff --git a/projects/Test/Integration/TestExtensions.cs b/projects/Test/Integration/TestExtensions.cs
index fd55296e6..c72f49d4a 100644
--- a/projects/Test/Integration/TestExtensions.cs
+++ b/projects/Test/Integration/TestExtensions.cs
@@ -43,28 +43,9 @@ public TestExtensions(ITestOutputHelper output) : base(output)
{
}
- [Fact]
- public async Task TestConfirmBarrier()
- {
- await _channel.ConfirmSelectAsync();
- for (int i = 0; i < 10; i++)
- {
- await _channel.BasicPublishAsync(string.Empty, string.Empty, Array.Empty());
- }
- Assert.True(await _channel.WaitForConfirmsAsync());
- }
-
- [Fact]
- public async Task TestConfirmBeforeWait()
- {
- await Assert.ThrowsAsync(() => _channel.WaitForConfirmsAsync());
- }
-
[Fact]
public async Task TestExchangeBinding()
{
- await _channel.ConfirmSelectAsync();
-
await _channel.ExchangeDeclareAsync("src", ExchangeType.Direct, false, false);
await _channel.ExchangeDeclareAsync("dest", ExchangeType.Direct, false, false);
string queue = await _channel.QueueDeclareAsync(string.Empty, false, false, true);
@@ -73,12 +54,10 @@ public async Task TestExchangeBinding()
await _channel.QueueBindAsync(queue, "dest", string.Empty);
await _channel.BasicPublishAsync("src", string.Empty, Array.Empty());
- await _channel.WaitForConfirmsAsync();
Assert.NotNull(await _channel.BasicGetAsync(queue, true));
await _channel.ExchangeUnbindAsync("dest", "src", string.Empty);
await _channel.BasicPublishAsync("src", string.Empty, Array.Empty());
- await _channel.WaitForConfirmsAsync();
Assert.Null(await _channel.BasicGetAsync(queue, true));
diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs
index 836aead2f..337bd3d49 100644
--- a/projects/Test/Integration/TestFloodPublishing.cs
+++ b/projects/Test/Integration/TestFloodPublishing.cs
@@ -30,6 +30,7 @@
//---------------------------------------------------------------------------
using System;
+using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
@@ -42,7 +43,7 @@ namespace Test.Integration
{
public class TestFloodPublishing : IntegrationFixture
{
- private static readonly TimeSpan TenSeconds = TimeSpan.FromSeconds(10);
+ private static readonly TimeSpan FiveSeconds = TimeSpan.FromSeconds(5);
private readonly byte[] _body = GetRandomBody(2048);
public TestFloodPublishing(ITestOutputHelper output) : base(output)
@@ -64,7 +65,7 @@ public async Task TestUnthrottledFloodPublishing()
_connFactory.AutomaticRecoveryEnabled = false;
_conn = await _connFactory.CreateConnectionAsync();
Assert.IsNotType(_conn);
- _channel = await _conn.CreateChannelAsync();
+ _channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
_conn.ConnectionShutdownAsync += (_, ea) =>
{
@@ -91,21 +92,30 @@ public async Task TestUnthrottledFloodPublishing()
return Task.CompletedTask;
};
+ var publishTasks = new List();
var stopwatch = Stopwatch.StartNew();
int i = 0;
+ int publishCount = 0;
try
{
for (i = 0; i < 65535 * 64; i++)
{
if (i % 65536 == 0)
{
- if (stopwatch.Elapsed > TenSeconds)
+ if (stopwatch.Elapsed > FiveSeconds)
{
break;
}
}
- await _channel.BasicPublishAsync(CachedString.Empty, CachedString.Empty, _body);
+ publishCount++;
+ publishTasks.Add(_channel.BasicPublishAsync(CachedString.Empty, CachedString.Empty, _body).AsTask());
+
+ if (i % 500 == 0)
+ {
+ await Task.WhenAll(publishTasks).WaitAsync(ShortSpan);
+ publishTasks.Clear();
+ }
}
}
finally
@@ -115,6 +125,7 @@ public async Task TestUnthrottledFloodPublishing()
Assert.True(_conn.IsOpen);
Assert.False(sawUnexpectedShutdown);
+ _output.WriteLine("[INFO] published {0} messages in {1}", publishCount, stopwatch.Elapsed);
}
[Fact]
@@ -181,9 +192,8 @@ public async Task TestMultithreadFloodPublishing()
return Task.CompletedTask;
};
- await using (IChannel publishChannel = await publishConnection.CreateChannelAsync())
+ await using (IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
{
- await publishChannel.ConfirmSelectAsync();
publishChannel.ChannelShutdownAsync += (o, ea) =>
{
@@ -198,12 +208,13 @@ public async Task TestMultithreadFloodPublishing()
return Task.CompletedTask;
};
+ var publishTasks = new List();
for (int i = 0; i < publishCount && false == stop; i++)
{
- await publishChannel.BasicPublishAsync(string.Empty, queueName, true, sendBody);
+ publishTasks.Add(publishChannel.BasicPublishAsync(string.Empty, queueName, true, sendBody).AsTask());
}
- await publishChannel.WaitForConfirmsOrDieAsync();
+ await Task.WhenAll(publishTasks).WaitAsync(ShortSpan);
await publishChannel.CloseAsync();
}
diff --git a/projects/Test/Integration/TestMessageCount.cs b/projects/Test/Integration/TestMessageCount.cs
index beca50f6d..363328374 100644
--- a/projects/Test/Integration/TestMessageCount.cs
+++ b/projects/Test/Integration/TestMessageCount.cs
@@ -45,13 +45,11 @@ public TestMessageCount(ITestOutputHelper output) : base(output)
[Fact]
public async Task TestMessageCountMethod()
{
- await _channel.ConfirmSelectAsync();
string q = GenerateQueueName();
await _channel.QueueDeclareAsync(queue: q, passive: false, durable: false, exclusive: true, autoDelete: false, arguments: null);
Assert.Equal(0u, await _channel.MessageCountAsync(q));
await _channel.BasicPublishAsync("", q, _encoding.GetBytes("msg"));
- await _channel.WaitForConfirmsAsync();
Assert.Equal(1u, await _channel.MessageCountAsync(q));
}
}
diff --git a/projects/Test/Integration/TestPublisherConfirms.cs b/projects/Test/Integration/TestPublisherConfirms.cs
index 80df6ea72..301e30b0f 100644
--- a/projects/Test/Integration/TestPublisherConfirms.cs
+++ b/projects/Test/Integration/TestPublisherConfirms.cs
@@ -29,11 +29,10 @@
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------
-using System;
+using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
-using RabbitMQ.Client.Impl;
using Xunit;
using Xunit.Abstractions;
@@ -49,52 +48,8 @@ public TestPublisherConfirms(ITestOutputHelper output)
_messageBody = GetRandomBody(4096);
}
- [Fact]
- public Task TestWaitForConfirmsWithoutTimeoutAsync()
- {
- return TestWaitForConfirmsAsync(200, async (ch) =>
- {
- Assert.True(await ch.WaitForConfirmsAsync());
- });
- }
-
- [Fact]
- public Task TestWaitForConfirmsWithTimeout()
- {
- return TestWaitForConfirmsAsync(200, async (ch) =>
- {
- using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4));
- Assert.True(await ch.WaitForConfirmsAsync(cts.Token));
- });
- }
-
- [Fact]
- public async Task TestWaitForConfirmsWithTimeoutAsync_MightThrowTaskCanceledException()
- {
- bool waitResult = false;
- bool sawException = false;
-
- Task t = TestWaitForConfirmsAsync(10000, async (ch) =>
- {
- using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(1));
- try
- {
- waitResult = await ch.WaitForConfirmsAsync(cts.Token);
- }
- catch
- {
- sawException = true;
- }
- });
-
- await t;
-
- if (waitResult == false && sawException == false)
- {
- Assert.Fail("test failed, both waitResult and sawException are still false");
- }
- }
-
+ /*
+ * TODO figure out how to actually test basic.nack and basic.return
[Fact]
public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse()
{
@@ -106,13 +61,13 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout
Assert.False(await ch.WaitForConfirmsAsync(cts.Token));
});
}
+ */
[Fact]
public async Task TestWaitForConfirmsWithEventsAsync()
{
string queueName = GenerateQueueName();
- await using IChannel ch = await _conn.CreateChannelAsync();
- await ch.ConfirmSelectAsync();
+ await using IChannel ch = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false,
exclusive: true, autoDelete: false, arguments: null);
@@ -128,12 +83,12 @@ await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false,
try
{
+ var publishTasks = new List();
for (int i = 0; i < n; i++)
{
- await ch.BasicPublishAsync("", queueName, _encoding.GetBytes("msg"));
+ publishTasks.Add(ch.BasicPublishAsync("", queueName, _encoding.GetBytes("msg")).AsTask());
}
-
- await ch.WaitForConfirmsAsync();
+ await Task.WhenAll(publishTasks).WaitAsync(ShortSpan);
// Note: number of event invocations is not guaranteed
// to be equal to N because acks can be batched,
@@ -147,32 +102,5 @@ await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false,
await ch.CloseAsync();
}
}
-
- private async Task TestWaitForConfirmsAsync(int numberOfMessagesToPublish, Func fn)
- {
- string queueName = GenerateQueueName();
- await using IChannel ch = await _conn.CreateChannelAsync();
- var props = new BasicProperties { Persistent = true };
-
- await ch.ConfirmSelectAsync();
- await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false,
- exclusive: true, autoDelete: false, arguments: null);
-
- for (int i = 0; i < numberOfMessagesToPublish; i++)
- {
- await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,
- body: _messageBody, mandatory: true, basicProperties: props);
- }
-
- try
- {
- await fn(ch);
- }
- finally
- {
- await ch.QueueDeleteAsync(queue: queueName, ifUnused: false, ifEmpty: false);
- await ch.CloseAsync();
- }
- }
}
}
diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs
index 725d7f7a5..9c588f889 100644
--- a/projects/Test/Integration/TestToxiproxy.cs
+++ b/projects/Test/Integration/TestToxiproxy.cs
@@ -125,28 +125,26 @@ public async Task TestCloseConnection()
async Task PublishLoop()
{
- await using IChannel ch = await conn.CreateChannelAsync();
- await ch.ConfirmSelectAsync();
+ await using IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
QueueDeclareOk q = await ch.QueueDeclareAsync();
while (conn.IsOpen)
{
- await ch.BasicPublishAsync("", q.QueueName, GetRandomBody());
- messagePublishedTcs.TrySetResult(true);
/*
- * Note:
- * In this test, it is possible that the connection
- * will be closed before the ack is returned,
- * and this await will throw an exception
- */
+ * Note:
+ * In this test, it is possible that the connection
+ * will be closed before the ack is returned,
+ * and this await will throw an exception
+ */
try
{
- await ch.WaitForConfirmsAsync();
+ await ch.BasicPublishAsync("", q.QueueName, GetRandomBody());
+ messagePublishedTcs.TrySetResult(true);
}
catch (AlreadyClosedException ex)
{
if (IsVerbose)
{
- _output.WriteLine($"[WARNING] WaitForConfirmsAsync ex: {ex}");
+ _output.WriteLine($"[WARNING] BasicPublishAsync ex: {ex}");
}
}
}
@@ -206,13 +204,11 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout()
Task pubTask = Task.Run(async () =>
{
await using IConnection conn = await cf.CreateConnectionAsync();
- await using IChannel ch = await conn.CreateChannelAsync();
- await ch.ConfirmSelectAsync();
+ await using IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
QueueDeclareOk q = await ch.QueueDeclareAsync();
while (conn.IsOpen)
{
await ch.BasicPublishAsync("", q.QueueName, GetRandomBody());
- await ch.WaitForConfirmsAsync();
await Task.Delay(TimeSpan.FromSeconds(1));
tcs.TrySetResult(true);
}
diff --git a/projects/Test/OAuth2/TestOAuth2.cs b/projects/Test/OAuth2/TestOAuth2.cs
index c4abf5f94..347ad12ce 100644
--- a/projects/Test/OAuth2/TestOAuth2.cs
+++ b/projects/Test/OAuth2/TestOAuth2.cs
@@ -116,13 +116,13 @@ public async Task DisposeAsync()
if (_connection != null)
{
await _connection.CloseAsync();
+ await _connection.DisposeAsync();
}
}
finally
{
_doneEvent.Dispose();
_producerCredentialsProvider?.Dispose();
- _connection?.Dispose();
_cancellationTokenSource.Dispose();
}
}
@@ -224,14 +224,13 @@ public async Task SecondConnectionCrashes_GH1429()
Assert.NotNull(_connectionFactory);
// https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1429
IConnection secondConnection = await _connectionFactory.CreateConnectionAsync(CancellationToken.None);
- secondConnection.Dispose();
+ await secondConnection.DisposeAsync();
}
private async Task DeclarePublishChannelAsync()
{
Assert.NotNull(_connection);
- IChannel publishChannel = await _connection.CreateChannelAsync();
- await publishChannel.ConfirmSelectAsync();
+ IChannel publishChannel = await _connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
await publishChannel.ExchangeDeclareAsync("test_direct", ExchangeType.Direct, true, false);
return publishChannel;
}
@@ -246,11 +245,9 @@ private async Task PublishAsync(IChannel publishChannel)
AppId = "oauth2",
};
- await publishChannel.BasicPublishAsync(exchange: Exchange, routingKey: "hello", false, basicProperties: properties, body: body);
- _testOutputHelper.WriteLine("Sent message");
-
- await publishChannel.WaitForConfirmsOrDieAsync();
- _testOutputHelper.WriteLine("Confirmed Sent message");
+ await publishChannel.BasicPublishAsync(exchange: Exchange, routingKey: "hello", false,
+ basicProperties: properties, body: body);
+ _testOutputHelper.WriteLine("Sent and confirmed message");
}
private async ValueTask DeclareConsumeChannelAsync()
diff --git a/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs b/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs
index cc7a96241..eaadbdd3b 100644
--- a/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs
+++ b/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs
@@ -48,10 +48,12 @@ public async Task BlockAsync()
await Task.Delay(TimeSpan.FromSeconds(1));
}
- public async Task BlockAsync(IChannel channel)
+ public async Task BlockAndPublishAsync()
{
+ // TODO fix publisher confirmation tracking to time out so this test succeeds
+ await using IChannel ch = await _conn.CreateChannelAsync();
await BlockAsync();
- await channel.BasicPublishAsync(exchange: "amq.direct",
+ await ch.BasicPublishAsync(exchange: "amq.direct",
routingKey: Guid.NewGuid().ToString(), _encoding.GetBytes("message"));
}
diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs
index 107168f9f..7ea58dc22 100644
--- a/projects/Test/SequentialIntegration/TestActivitySource.cs
+++ b/projects/Test/SequentialIntegration/TestActivitySource.cs
@@ -80,8 +80,6 @@ void AssertIntTagGreaterThanZero(Activity activity, string name)
[InlineData(false)]
public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName)
{
- await _channel.ConfirmSelectAsync();
-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
var _activities = new List();
using ActivityListener activityListener = StartActivityListener(_activities);
@@ -102,7 +100,6 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
await _channel.BasicPublishAsync("", q.QueueName, true, sendBody);
- await _channel.WaitForConfirmsOrDieAsync();
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.True(await consumerReceivedTcs.Task);
@@ -117,8 +114,6 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera
[InlineData(false)]
public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool useRoutingKeyAsOperationName)
{
- await _channel.ConfirmSelectAsync();
-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
var _activities = new List();
using ActivityListener activityListener = StartActivityListener(_activities);
@@ -141,7 +136,6 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool use
CachedString exchange = new CachedString("");
CachedString routingKey = new CachedString(q.QueueName);
await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody);
- await _channel.WaitForConfirmsOrDieAsync();
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.True(await consumerReceivedTcs.Task);
@@ -156,8 +150,6 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool use
[InlineData(false)]
public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(bool useRoutingKeyAsOperationName)
{
- await _channel.ConfirmSelectAsync();
-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
var _activities = new List();
using ActivityListener activityListener = StartActivityListener(_activities);
@@ -179,7 +171,6 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(boo
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
PublicationAddress publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName);
await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody);
- await _channel.WaitForConfirmsOrDieAsync();
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.True(await consumerReceivedTcs.Task);
@@ -194,8 +185,6 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(boo
[InlineData(false)]
public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName)
{
- await _channel.ConfirmSelectAsync();
-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
var activities = new List();
using ActivityListener activityListener = StartActivityListener(activities);
@@ -217,7 +206,6 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
await _channel.BasicPublishAsync("", q.QueueName, true, sendBody);
- await _channel.WaitForConfirmsOrDieAsync();
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.True(await consumerReceivedTcs.Task);
@@ -232,8 +220,6 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs
[InlineData(false)]
public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName)
{
- await _channel.ConfirmSelectAsync();
-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
var activities = new List();
using ActivityListener activityListener = StartActivityListener(activities);
@@ -257,7 +243,6 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo
CachedString exchange = new CachedString("");
CachedString routingKey = new CachedString(q.QueueName);
await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody);
- await _channel.WaitForConfirmsOrDieAsync();
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.True(await consumerReceivedTcs.Task);
@@ -272,8 +257,6 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo
[InlineData(false)]
public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName)
{
- await _channel.ConfirmSelectAsync();
-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
var activities = new List();
using ActivityListener activityListener = StartActivityListener(activities);
@@ -296,7 +279,6 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName);
await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody);
- await _channel.WaitForConfirmsOrDieAsync();
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.True(await consumerReceivedTcs.Task);
@@ -311,7 +293,6 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn
[InlineData(false)]
public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName)
{
- await _channel.ConfirmSelectAsync();
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
var activities = new List();
using ActivityListener activityListener = StartActivityListener(activities);
@@ -323,7 +304,6 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
{
await _channel.QueueDeclareAsync(queue, false, false, false, null);
await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg));
- await _channel.WaitForConfirmsOrDieAsync();
QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);
Assert.Equal(1u, ok.MessageCount);
BasicGetResult res = await _channel.BasicGetAsync(queue, true);
@@ -344,7 +324,6 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
[InlineData(false)]
public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool useRoutingKeyAsOperationName)
{
- await _channel.ConfirmSelectAsync();
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
var activities = new List();
using ActivityListener activityListener = StartActivityListener(activities);
@@ -358,7 +337,6 @@ public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool use
CachedString routingKey = new CachedString(queue);
await _channel.QueueDeclareAsync(queue, false, false, false, null);
await _channel.BasicPublishAsync(exchange, routingKey, true, Encoding.UTF8.GetBytes(msg));
- await _channel.WaitForConfirmsOrDieAsync();
QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);
Assert.Equal(1u, ok.MessageCount);
BasicGetResult res = await _channel.BasicGetAsync(queue, true);
@@ -379,7 +357,6 @@ public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool use
[InlineData(false)]
public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTags(bool useRoutingKeyAsOperationName)
{
- await _channel.ConfirmSelectAsync();
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
var activities = new List();
using ActivityListener activityListener = StartActivityListener(activities);
@@ -393,7 +370,6 @@ public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTags(boo
await _channel.QueueDeclareAsync(queue, false, false, false, null);
await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(),
Encoding.UTF8.GetBytes(msg));
- await _channel.WaitForConfirmsOrDieAsync();
QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);
Assert.Equal(1u, ok.MessageCount);
BasicGetResult res = await _channel.BasicGetAsync(queue, true);
diff --git a/projects/Test/SequentialIntegration/TestConnectionBlocked.cs b/projects/Test/SequentialIntegration/TestConnectionBlocked.cs
index d4ffaf607..92c095768 100644
--- a/projects/Test/SequentialIntegration/TestConnectionBlocked.cs
+++ b/projects/Test/SequentialIntegration/TestConnectionBlocked.cs
@@ -60,10 +60,10 @@ public override async Task DisposeAsync()
public async Task TestConnectionBlockedNotification()
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- _conn.ConnectionBlockedAsync += async (object sender, ConnectionBlockedEventArgs args) =>
+ _conn.ConnectionBlockedAsync += (object sender, ConnectionBlockedEventArgs args) =>
{
- // TODO should this continue to be doing fire and forget?
- await UnblockAsync();
+ UnblockAsync();
+ return Task.CompletedTask;
};
_conn.ConnectionUnblockedAsync += (object sender, AsyncEventArgs ea) =>
@@ -72,7 +72,7 @@ public async Task TestConnectionBlockedNotification()
return Task.CompletedTask;
};
- await BlockAsync(_channel);
+ await BlockAndPublishAsync();
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(15));
Assert.True(await tcs.Task, "Unblock notification not received.");
}
@@ -82,14 +82,14 @@ public async Task TestDisposeOnBlockedConnectionDoesNotHang()
{
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- await BlockAsync(_channel);
+ await BlockAndPublishAsync();
Task disposeTask = Task.Run(async () =>
{
try
{
await _conn.AbortAsync();
- _conn.Dispose();
+ await _conn.DisposeAsync();
tcs.SetResult(true);
}
catch (Exception)
diff --git a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs
index abf8e238b..4c25108d2 100644
--- a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs
+++ b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs
@@ -135,7 +135,7 @@ Task _channel_ChannelShutdownAsync(object sender, ShutdownEventArgs e)
Assert.False(_channel.IsClosed);
await _channel.CloseAsync();
- _channel.Dispose();
+ await _channel.DisposeAsync();
Assert.True(_channel.IsClosed);
_channel = null;
@@ -155,11 +155,12 @@ public async Task TestBlockedListenersRecovery()
};
await CloseAndWaitForRecoveryAsync();
await CloseAndWaitForRecoveryAsync();
- await BlockAsync(_channel);
+ await BlockAndPublishAsync();
await WaitAsync(tcs, "connection blocked");
}
finally
{
+ // NOTE: must unblock so that close succeeeds on test tear-down
await UnblockAsync();
}
}
@@ -194,15 +195,12 @@ public async Task TestClientNamedTransientAutoDeleteQueueAndBindingRecovery()
await RestartServerAndWaitForRecoveryAsync();
Assert.True(_channel.IsOpen);
- await _channel.ConfirmSelectAsync();
QueueDeclareOk ok0 = await _channel.QueueDeclarePassiveAsync(queue: queueName);
Assert.Equal(queueName, ok0.QueueName);
await _channel.QueuePurgeAsync(queueName);
await _channel.ExchangeDeclarePassiveAsync(exchange: exchangeName);
await _channel.BasicPublishAsync(exchange: exchangeName, routingKey: "", body: _encoding.GetBytes("msg"));
- await WaitForConfirmsWithCancellationAsync(_channel);
-
QueueDeclareOk ok1 = await _channel.QueueDeclarePassiveAsync(queue: queueName);
Assert.Equal(1u, ok1.MessageCount);
}
@@ -240,10 +238,8 @@ public async Task TestServerNamedTransientAutoDeleteQueueAndBindingRecovery()
Assert.True(_channel.IsOpen);
Assert.NotEqual(nameBefore, nameAfter);
- await _channel.ConfirmSelectAsync();
await _channel.ExchangeDeclareAsync(exchange: x, type: "fanout");
await _channel.BasicPublishAsync(exchange: x, routingKey: "", body: _encoding.GetBytes("msg"));
- await WaitForConfirmsWithCancellationAsync(_channel);
QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(nameAfter);
Assert.Equal(1u, ok.MessageCount);
@@ -345,7 +341,7 @@ public async Task TestUnblockedListenersRecovery()
};
await CloseAndWaitForRecoveryAsync();
await CloseAndWaitForRecoveryAsync();
- await BlockAsync(_channel);
+ await BlockAndPublishAsync();
await UnblockAsync();
await WaitAsync(tcs, "connection unblocked");
}
diff --git a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs
index 7a99a00e6..2a2fb6567 100644
--- a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs
+++ b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs
@@ -95,7 +95,6 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera
string baggageGuid = Guid.NewGuid().ToString();
Baggage.SetBaggage("TestItem", baggageGuid);
Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
- await _channel.ConfirmSelectAsync();
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
await Task.Delay(500);
@@ -125,7 +124,6 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
await _channel.BasicPublishAsync("", q.QueueName, true, sendBody);
- await _channel.WaitForConfirmsOrDieAsync();
Baggage.ClearBaggage();
Assert.Null(Baggage.GetBaggage("TestItem"));
@@ -150,7 +148,6 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs
string baggageGuid = Guid.NewGuid().ToString();
Baggage.SetBaggage("TestItem", baggageGuid);
Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
- await _channel.ConfirmSelectAsync();
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
await Task.Delay(500);
@@ -181,7 +178,6 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
await _channel.BasicPublishAsync("", q.QueueName, true, sendBody);
- await _channel.WaitForConfirmsOrDieAsync();
Baggage.ClearBaggage();
Assert.Null(Baggage.GetBaggage("TestItem"));
@@ -206,7 +202,6 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn
string baggageGuid = Guid.NewGuid().ToString();
Baggage.SetBaggage("TestItem", baggageGuid);
Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
- await _channel.ConfirmSelectAsync();
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
await Task.Delay(500);
@@ -238,7 +233,6 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", queueName);
await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody);
- await _channel.WaitForConfirmsOrDieAsync();
Baggage.ClearBaggage();
Assert.Null(Baggage.GetBaggage("TestItem"));
@@ -263,7 +257,6 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo
string baggageGuid = Guid.NewGuid().ToString();
Baggage.SetBaggage("TestItem", baggageGuid);
Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
- await _channel.ConfirmSelectAsync();
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
await Task.Delay(500);
@@ -296,7 +289,6 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo
CachedString exchange = new CachedString("");
CachedString routingKey = new CachedString(queueName);
await _channel.BasicPublishAsync(exchange, routingKey, sendBody);
- await _channel.WaitForConfirmsOrDieAsync();
Baggage.ClearBaggage();
Assert.Null(Baggage.GetBaggage("TestItem"));
@@ -321,7 +313,6 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
string baggageGuid = Guid.NewGuid().ToString();
Baggage.SetBaggage("TestItem", baggageGuid);
Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
- await _channel.ConfirmSelectAsync();
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
await Task.Delay(500);
string queue = $"queue-{Guid.NewGuid()}";
@@ -331,7 +322,6 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
{
await _channel.QueueDeclareAsync(queue, false, false, false, null);
await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg));
- await _channel.WaitForConfirmsOrDieAsync();
Baggage.ClearBaggage();
Assert.Null(Baggage.GetBaggage("TestItem"));
QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);