Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update RabbitMQ transport connection string options #4755

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<PackageVersion Include="Microsoft-WindowsAPICodePack-Shell" Version="1.1.5" />
<PackageVersion Include="Mindscape.Raygun4Net.NetCore" Version="11.2.1" />
<PackageVersion Include="NLog.Extensions.Logging" Version="5.3.15" />
<PackageVersion Include="NServiceBus" Version="9.2.3" />
<PackageVersion Include="NServiceBus" Version="9.2.4" />
<PackageVersion Include="NServiceBus.AcceptanceTesting" Version="9.2.3" />
<PackageVersion Include="NServiceBus.AmazonSQS" Version="7.1.1" />
<PackageVersion Include="NServiceBus.CustomChecks" Version="5.0.1" />
Expand All @@ -38,7 +38,8 @@
<PackageVersion Include="NServiceBus.Metrics" Version="5.0.1" />
<PackageVersion Include="NServiceBus.Metrics.ServiceControl" Version="5.0.0" />
<PackageVersion Include="NServiceBus.Persistence.NonDurable" Version="2.0.1" />
<PackageVersion Include="NServiceBus.RabbitMQ" Version="9.2.0" />
<PackageVersion Include="NServiceBus.RabbitMQ" Version="10.0.0-alpha.0.97" />
<!--<PackageVersion Include="NServiceBus.RabbitMQ" Version="9.2.0" />-->
<PackageVersion Include="NServiceBus.SagaAudit" Version="5.0.2" />
<PackageVersion Include="NServiceBus.Testing" Version="9.0.1" />
<PackageVersion Include="NServiceBus.Transport.AzureServiceBus" Version="4.2.4" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void Success()
return;
}

timer.Change(System.Threading.Timeout.Infinite, System.Threading.Timeout.Infinite);
timer.Change(Timeout.Infinite, Timeout.Infinite);
Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ static Dictionary<string, string> ParseAmqpConnectionString(string connectionStr
return dictionary;
}

static Dictionary<string, string> ParseNServiceBusConnectionString(string connectionString, StringBuilder invalidOptionsMessage)
internal static Dictionary<string, string> ParseNServiceBusConnectionString(string connectionString, StringBuilder invalidOptionsMessage)
{
var dictionary = new DbConnectionStringBuilder { ConnectionString = connectionString }
.OfType<KeyValuePair<string, object>>()
Expand Down
19 changes: 13 additions & 6 deletions src/ServiceControl.Transports.RabbitMQ/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
using System.Net.Security;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using global::RabbitMQ.Client;

class ConnectionFactory
{
readonly string endpointName;
readonly global::RabbitMQ.Client.ConnectionFactory connectionFactory;
readonly object lockObject = new object();
readonly SemaphoreSlim semaphoreSlim = new(1, 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't forget to properly dispose the semaphore

Copy link
Member Author

@TravisNickels TravisNickels Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up. I'll take another look at this. I thought that the semaphoreSlim.Release() did the dispose, but maybe putting this in a using statement would be better so that the semaphoreSlim is always disposed?


public ConnectionFactory(string endpointName, ConnectionConfiguration connectionConfiguration,
X509Certificate2Collection clientCertificateCollection, bool disableRemoteCertificateValidation,
Expand Down Expand Up @@ -76,21 +78,26 @@ public ConnectionFactory(string endpointName, ConnectionConfiguration connection
}
}

public IConnection CreatePublishConnection() => CreateConnection($"{endpointName} Publish", false);
public async Task<IConnection> CreatePublishConnection(CancellationToken cancellationToken) => await CreateConnection($"{endpointName} Publish", false, cancellationToken);

public IConnection CreateAdministrationConnection() => CreateConnection($"{endpointName} Administration", false);
public Task<IConnection> CreateAdministrationConnection(CancellationToken cancellationToken) => CreateConnection($"{endpointName} Administration", false, cancellationToken);

public IConnection CreateConnection(string connectionName, bool automaticRecoveryEnabled = true)
public async Task<IConnection> CreateConnection(string connectionName, bool automaticRecoveryEnabled = true, CancellationToken cancellationToken = default)
{
lock (lockObject)
await semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
connectionFactory.AutomaticRecoveryEnabled = automaticRecoveryEnabled;
connectionFactory.ClientProperties["connected"] = DateTime.UtcNow.ToString("G");

var connection = connectionFactory.CreateConnection(connectionName);
var connection = await connectionFactory.CreateConnectionAsync(connectionName, cancellationToken);

return connection;
}
finally
{
_ = semaphoreSlim.Release();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace ServiceControl.Transports.RabbitMQ
{
using NServiceBus;

public interface IRabbitMQTransportExtensions
{
RabbitMQTransport GetTransport();
}
}
18 changes: 9 additions & 9 deletions src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ async Task FetchQueueLengths(CancellationToken cancellationToken)
{
foreach (var endpointQueuePair in endpointQueues)
{
await queryExecutor.Execute(m =>
await queryExecutor.Execute(async m =>
{
var queueName = endpointQueuePair.Value;

try
{
var size = (int)m.MessageCount(queueName);
var size = (int)await m.MessageCountAsync(queueName, cancellationToken).ConfigureAwait(false);

sizes.AddOrUpdate(queueName, _ => size, (_, __) => size);
}
Expand Down Expand Up @@ -120,26 +120,26 @@ public void Initialize()
null); // value would come from config API in actual transport
}

public async Task Execute(Action<IModel> action, CancellationToken cancellationToken = default)
public async Task Execute(Action<IChannel> action, CancellationToken cancellationToken = default)
{
try
{
connection ??= connectionFactory.CreateConnection("queue length monitor");
connection ??= await connectionFactory.CreateConnection("queue length monitor", cancellationToken: cancellationToken);

//Connection implements reconnection logic
while (!connection.IsOpen)
{
await Task.Delay(ReconnectionDelay, cancellationToken);
}

if (model == null || model.IsClosed)
if (channel == null || channel.IsClosed)
{
model?.Dispose();
channel?.Dispose();

model = connection.CreateModel();
channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}

action(model);
action(channel);
}
catch (OperationCanceledException)
{
Expand All @@ -154,7 +154,7 @@ public async Task Execute(Action<IModel> action, CancellationToken cancellationT
public void Dispose() => connection?.Dispose();

IConnection connection;
IModel model;
IChannel channel;
ConnectionFactory connectionFactory;

static readonly TimeSpan ReconnectionDelay = TimeSpan.FromSeconds(5);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
namespace ServiceControl.Transports.RabbitMQ
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using BrokerThroughput;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus;

public abstract class RabbitMQConventionalRoutingTransportCustomization(QueueType queueType)
: TransportCustomization<RabbitMQTransport>
: TransportCustomization<RabbitMQTransport>, IRabbitMQTransportExtensions
{

RabbitMQTransport rabbitMQTransport;

protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }

protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
Expand All @@ -22,22 +27,64 @@ protected override RabbitMQTransport CreateTransport(TransportSettings transport
throw new InvalidOperationException("Connection string not configured");
}

var connectionConfiguration = ConnectionConfiguration.Create(transportSettings.ConnectionString, string.Empty);
var connectionStringDictionary = ConnectionConfiguration.ParseNServiceBusConnectionString(transportSettings.ConnectionString, new StringBuilder());

var disableManagementApiString = GetValue(connectionStringDictionary, "DisableManagementApi", "false");
if (!bool.TryParse(disableManagementApiString, out var disableManagementApi))
{
throw new ArgumentException("The value for 'DisableManagementApi' must be either 'true' or 'false'");
}

var transport = new RabbitMQTransport(RoutingTopology.Conventional(queueType), transportSettings.ConnectionString, enableDelayedDelivery: false);
transport.TransportTransactionMode = transport.GetSupportedTransactionModes().Contains(preferredTransactionMode) ? preferredTransactionMode : TransportTransactionMode.ReceiveOnly;
transport.UseManagementApi = !disableManagementApi;

if (!transport.UseManagementApi)
{
rabbitMQTransport = transport;
return transport;
}

var url = GetValue(connectionStringDictionary, "ManagementApiUrl", string.Empty);
var username = GetValue(connectionStringDictionary, "ManagementApiUserName", connectionConfiguration.UserName);
var password = GetValue(connectionStringDictionary, "ManagementApiPassword", connectionConfiguration.Password);

if (!string.IsNullOrEmpty(url))
{
if (!string.IsNullOrEmpty(username) && !string.IsNullOrEmpty(password))
{
transport.ManagementApiConfiguration = new ManagementApiConfiguration(url, username, password);
}
else
{
transport.ManagementApiConfiguration = new ManagementApiConfiguration(url);
}
}

rabbitMQTransport = transport;
return transport;
}

protected override void AddTransportForPrimaryCore(IServiceCollection services,
TransportSettings transportSettings)
{
services.AddSingleton<IBrokerThroughputQuery, RabbitMQQuery>();
}
protected override void AddTransportForPrimaryCore(IServiceCollection services, TransportSettings transportSettings)
=> services.AddSingleton<IBrokerThroughputQuery, RabbitMQQuery>();

protected sealed override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings)
{
services.AddSingleton<IProvideQueueLength, QueueLengthProvider>();
services.AddHostedService(provider => provider.GetRequiredService<IProvideQueueLength>());
}

static string GetValue(Dictionary<string, string> dictionary, string key, string defaultValue)
=> dictionary.TryGetValue(key, out var value) ? value : defaultValue;

RabbitMQTransport IRabbitMQTransportExtensions.GetTransport()
{
if (rabbitMQTransport == null)
{
throw new InvalidOperationException("Transport instance has not been created yet. Make sure CreateTransport() is called before accessing the transport.");
};
return rabbitMQTransport;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
namespace ServiceControl.Transports.RabbitMQ
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using BrokerThroughput;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus;

public abstract class RabbitMQDirectRoutingTransportCustomization : TransportCustomization<RabbitMQTransport>
public abstract class RabbitMQDirectRoutingTransportCustomization(QueueType queueType)
: TransportCustomization<RabbitMQTransport>, IRabbitMQTransportExtensions
{
readonly QueueType queueType;

protected RabbitMQDirectRoutingTransportCustomization(QueueType queueType) => this.queueType = queueType;
RabbitMQTransport rabbitMQTransport;

protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }

Expand All @@ -24,22 +25,64 @@ protected override RabbitMQTransport CreateTransport(TransportSettings transport
throw new InvalidOperationException("Connection string not configured");
}

var connectionConfiguration = ConnectionConfiguration.Create(transportSettings.ConnectionString, string.Empty);
var connectionStringDictionary = ConnectionConfiguration.ParseNServiceBusConnectionString(transportSettings.ConnectionString, new StringBuilder());

var disableManagementApiString = GetValue(connectionStringDictionary, "DisableManagementApi", "false");
if (!bool.TryParse(disableManagementApiString, out var disableManagementApi))
{
throw new ArgumentException("The value for 'DisableManagementApi' must be either 'true' or 'false'");
}

var transport = new RabbitMQTransport(RoutingTopology.Direct(queueType, routingKeyConvention: type => type.FullName.Replace(".", "-")), transportSettings.ConnectionString, enableDelayedDelivery: false);
transport.TransportTransactionMode = transport.GetSupportedTransactionModes().Contains(preferredTransactionMode) ? preferredTransactionMode : TransportTransactionMode.ReceiveOnly;
transport.UseManagementApi = !disableManagementApi;

if (!transport.UseManagementApi)
{
rabbitMQTransport = transport;
return transport;
}

var url = GetValue(connectionStringDictionary, "ManagementApiUrl", string.Empty);
var username = GetValue(connectionStringDictionary, "ManagementApiUserName", connectionConfiguration.UserName);
var password = GetValue(connectionStringDictionary, "ManagementApiPassword", connectionConfiguration.Password);

if (!string.IsNullOrEmpty(url))
{
if (!string.IsNullOrEmpty(username) && !string.IsNullOrEmpty(password))
{
transport.ManagementApiConfiguration = new ManagementApiConfiguration(url, username, password);
}
else
{
transport.ManagementApiConfiguration = new ManagementApiConfiguration(url);
}
}

rabbitMQTransport = transport;
return transport;
}

protected override void AddTransportForPrimaryCore(IServiceCollection services,
TransportSettings transportSettings)
{
services.AddSingleton<IBrokerThroughputQuery, RabbitMQQuery>();
}
protected override void AddTransportForPrimaryCore(IServiceCollection services, TransportSettings transportSettings)
=> services.AddSingleton<IBrokerThroughputQuery, RabbitMQQuery>();

protected sealed override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings)
{
services.AddSingleton<IProvideQueueLength, QueueLengthProvider>();
services.AddHostedService(provider => provider.GetRequiredService<IProvideQueueLength>());
}

static string GetValue(Dictionary<string, string> dictionary, string key, string defaultValue)
=> dictionary.TryGetValue(key, out var value) ? value : defaultValue;

RabbitMQTransport IRabbitMQTransportExtensions.GetTransport()
{
if (rabbitMQTransport == null)
{
throw new InvalidOperationException("Transport instance has not been created yet. Make sure CreateTransport() is called before accessing the transport.");
};
return rabbitMQTransport;
}
}
}
Loading