Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ delivery limit check documentation and upgrade guide #6960

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Snippets/Rabbit/Rabbit.sln
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Rabbit_9.1", "Rabbit_9.1\Ra
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Rabbit_9.2", "Rabbit_9.2\Rabbit_9.2.csproj", "{2F242592-4E40-4C87-90A4-32055B11544F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rabbit_10", "Rabbit_10\Rabbit_10.csproj", "{F99C022C-1BAD-4C78-8069-C34C3EB9AE5F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -51,6 +53,10 @@ Global
{2F242592-4E40-4C87-90A4-32055B11544F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2F242592-4E40-4C87-90A4-32055B11544F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2F242592-4E40-4C87-90A4-32055B11544F}.Release|Any CPU.Build.0 = Release|Any CPU
{F99C022C-1BAD-4C78-8069-C34C3EB9AE5F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F99C022C-1BAD-4C78-8069-C34C3EB9AE5F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F99C022C-1BAD-4C78-8069-C34C3EB9AE5F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F99C022C-1BAD-4C78-8069-C34C3EB9AE5F}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
18 changes: 18 additions & 0 deletions Snippets/Rabbit/Rabbit_10/AccessToBasicDeliverEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Threading.Tasks;
using NServiceBus.Pipeline;
using RabbitMQ.Client.Events;

#region rabbitmq-access-to-event-args
class AccessToBasicDeliverEventArgs : Behavior<IIncomingContext>
{
public override Task Invoke(IIncomingContext context, Func<Task> next)
{
var userIdOnBroker = context.Extensions.Get<BasicDeliverEventArgs>().BasicProperties.UserId;

//do something useful

return next();
}
}
#endregion
42 changes: 42 additions & 0 deletions Snippets/Rabbit/Rabbit_10/MyRoutingTopology.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using NServiceBus.Transport;
using NServiceBus.Transport.RabbitMQ;
using NServiceBus.Unicast.Messages;
using RabbitMQ.Client;

class MyRoutingTopology :
IRoutingTopology
{
public MyRoutingTopology(bool createDurableExchangesAndQueues)
{
}

public void SetupSubscription(IModel channel, MessageMetadata type, string subscriberName)
{
}

public void TeardownSubscription(IModel channel, MessageMetadata type, string subscriberName)
{
}

public void Publish(IModel channel, Type type, OutgoingMessage message, IBasicProperties properties)
{
}

public void Send(IModel channel, string address, OutgoingMessage message, IBasicProperties properties)
{
}

public void RawSendInCaseOfFailure(IModel channel, string address, ReadOnlyMemory<byte> body, IBasicProperties properties)
{
}

public void Initialize(IModel channel, IEnumerable<string> receivingAddresses, IEnumerable<string> sendingAddresses)
{
}

public void BindToDelayInfrastructure(IModel channel, string address, string deliveryExchange, string routingKey)
{
}
}
23 changes: 23 additions & 0 deletions Snippets/Rabbit/Rabbit_10/OutgoingNativeMessageCustomization.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using NServiceBus;

class OutgoingNativeMessageCustomization
{
OutgoingNativeMessageCustomization(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-customize-outgoing-message

var rabbitMqTransport = new RabbitMQTransport(
routingTopology: RoutingTopology.Conventional(QueueType.Classic),
connectionString: "host=localhost;username=rabbitmq;password=rabbitmq",
enableDelayedDelivery: false
);

rabbitMqTransport.OutgoingNativeMessageCustomization = (operation, properties) =>
{
//Set values on IBasicProperties
properties.ContentType = "application/my-type";
};

#endregion
}
}
19 changes: 19 additions & 0 deletions Snippets/Rabbit/Rabbit_10/RabbitMQNonPersistentDeliveryMode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Threading.Tasks;
using NServiceBus;

class RabbitMQNonPersistentDeliveryMode
{
public async Task RequestNonPersistent(IMessageHandlerContext context)
{
#region rabbitmq-non-persistent-delivery-mode
var options = new SendOptions();

options.RouteToThisEndpoint();
options.UseNonPersistentDeliveryMode();

await context.Send(new MyMessage(), options);
#endregion
}

class MyMessage { }
}
12 changes: 12 additions & 0 deletions Snippets/Rabbit/Rabbit_10/Rabbit_10.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NServiceBus.RabbitMQ" Version="10.0.0-alpha.0.92" />
<!--<PackageReference Include="NServiceBus.RabbitMQ" Version="9.1.*" />-->
</ItemGroup>

</Project>
229 changes: 229 additions & 0 deletions Snippets/Rabbit/Rabbit_10/Usage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
using System;
using System.Security.Cryptography.X509Certificates;

using NServiceBus;

class Usage
{
Usage(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-config-basic

endpointConfiguration.UseTransport<RabbitMQTransport>();

#endregion
}

void CustomConnectionString(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-config-connectionstring-in-code

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ConnectionString("My custom connection string");

#endregion
}

void CustomIdStrategy(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-config-custom-id-strategy

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.CustomMessageIdStrategy(
customIdStrategy: deliveryArgs =>
{
var headers = deliveryArgs.BasicProperties.Headers;
return headers["MyCustomId"].ToString();
});

#endregion
}

void UseConventionalRoutingTopology(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-config-useconventionalroutingtopology

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.UseConventionalRoutingTopology(QueueType.Quorum);

#endregion
}

void UseDirectRoutingTopology(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-config-usedirectroutingtopology

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.UseDirectRoutingTopology(QueueType.Quorum);

#endregion
}

void UseDirectRoutingTopologyWithCustomConventions(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-config-usedirectroutingtopologywithcustomconventions

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.UseDirectRoutingTopology(
QueueType.Quorum,
routingKeyConvention: MyRoutingKeyConvention,
exchangeNameConvention: () => "MyTopic");

#endregion
}

string MyRoutingKeyConvention(Type type)
{
throw new NotImplementedException();
}

void UseRoutingTopology5_0(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-config-useroutingtopologyDelegate

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.UseCustomRoutingTopology(
topologyFactory: createDurableExchangesAndQueues =>
{
return new MyRoutingTopology(createDurableExchangesAndQueues);
});

#endregion
}

void UseCustomCircuitBreakerSettings(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-custom-breaker-settings-time-to-wait-before-triggering-code

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.TimeToWaitBeforeTriggeringCircuitBreaker(TimeSpan.FromMinutes(2));

#endregion
}

void PrefetchMultiplier(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-config-prefetch-multiplier

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.PrefetchMultiplier(4);

#endregion
}

void PrefetchCount(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-config-prefetch-count

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.PrefetchCount(100);

#endregion
}

void SetClientCertificateFile(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-client-certificate-file

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.SetClientCertificate("path", "password");

#endregion
}

void SetClientCertificate(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-client-certificate

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.SetClientCertificate(new X509Certificate2("/path/to/certificate"));

#endregion
}

void DisableRemoteCertificateValidation(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-disable-remote-certificate-validation

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.DisableRemoteCertificateValidation();

#endregion
}

void UseExternalAuthMechanism(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-external-auth-mechanism

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.UseExternalAuthMechanism();

#endregion
}

void ChangeRequestedHeartbeatForDebugging(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-debug-api

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.SetHeartbeatInterval(TimeSpan.FromMinutes(10));

#endregion
}

void ChangeHeartbeatInterval(EndpointConfiguration endpointConfiguration)
{
#region change-heartbeat-interval

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.SetHeartbeatInterval(TimeSpan.FromSeconds(30));

#endregion
}

void ChangeNetworkRecoveryInterval(EndpointConfiguration endpointConfiguration)
{
#region change-network-recovery-interval

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.SetNetworkRecoveryInterval(TimeSpan.FromSeconds(30));

#endregion
}

void DisableDurableExchangesAndQueues(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-disable-durable-exchanges

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.DisableDurableExchangesAndQueues();

#endregion
}

void AddClusterNode(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-add-cluster-node

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.AddClusterNode("node2", useTls: false);

#endregion

#region rabbitmq-add-cluster-node-with-port

transport.AddClusterNode("node2", 5675, useTls: true);

#endregion
}

void SetManagementApiUrl(EndpointConfiguration endpointConfiguration)
{
#region rabbitmq-management-api-url

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ManagementApiUrl("http://{username}:{password}@{host}:{port}");

#endregion
}
}
Empty file.
3 changes: 3 additions & 0 deletions Snippets/Rabbit/Rabbit_10/rabbitmq-connection-tls.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
startcode rabbitmq-connection-tls
host=broker1;UseTls=true
endcode
2 changes: 2 additions & 0 deletions menu/menu.yaml
Original file line number Diff line number Diff line change
@@ -1426,6 +1426,8 @@
Articles:
- Title: Upgrade tips
Url: servicecontrol/upgrades
- Title: Version 6.2 to 6.3
Url: servicecontrol/upgrades/6.2to6.3
- Title: Version 5 to 6
Url: servicecontrol/upgrades/5to6
- Title: Version 5.1 to 5.2
5 changes: 5 additions & 0 deletions servicecontrol/transports.md
Original file line number Diff line number Diff line change
@@ -67,6 +67,11 @@ In addition to the [connection string options of the transport](/transports/rabb
* `UseExternalAuthMechanism=true|false(default)` - Specifies that an [external authentication mechanism should be used for client authentication](/transports/rabbitmq/connection-settings.md#transport-layer-security-support-external-authentication).
* `DisableRemoteCertificateValidation=true|false(default)` - Allows ServiceControl to connect to the broker [even if the remote server certificate is invalid](/transports/rabbitmq/connection-settings.md#transport-layer-security-support-remote-certificate-validation).

These options are available for only quorum queues in version 6.3 and above:

* `ManagementApiUri=<uri>` - The URI of the RabbitMQ management API. The default value is `http://guest:guest@localhost:15672/api/`
* `DisableManagemenApi=true|false(default)` - Disable the connection to the management API that is used to validate the delivery limit of quorum queues in RabbitMQ version 4 and above.

## SQL

In addition to the [connection string options of the transport](/transports/sql/connection-settings.md#connection-configuration) the following ServiceControl specific options are available in versions 4.4 and above:
11 changes: 11 additions & 0 deletions servicecontrol/upgrades/6.2to6.3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
title: Upgrade ServiceControl from Version 6.2.x to Version 6.3.x
summary: Instructions on how to upgrade ServiceControl from version 6.2.x to 6.3.x
reviewed: 2025-01-28
isUpgradeGuide: true
component: ServiceControl
---

## RabbitMQ delivery limit validation

ServiceControl version 6.3 uses rabbitMQ transport version 10.x. When using RabbitMQ 4 and above the management API is required as part of the transport to validate the delivery limit of the queue to insure that it's set to unlimited (-1) for proper recoverability functionality. The [management API plugin](https://www.rabbitmq.com/docs/management#getting-started) is required to be installed on the node. If the configuration of the management API is different than the default values based on the broker connection string, then the connection can be configured using the [RabbitMQ transport configuration options](/servicecontrol/transports.md#rabbitmq).
2 changes: 2 additions & 0 deletions transports/rabbitmq/connection-settings.md
Original file line number Diff line number Diff line change
@@ -101,6 +101,8 @@ snippet: rabbitmq-external-auth-mechanism

partial: add-cluster-node

partial: management-api-url

## Controlling the prefetch count

When consuming messages from the broker, throughput can be improved by having the consumer [prefetch](https://www.rabbitmq.com/consumer-prefetch.html) additional messages.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
## Configuring RabbitMQ delivery limit validation

In RabbitMQ version 4.0 and above, queues are created with a default delivery limit of 20. However, for the NServiceBus recoverability process to function properly, the delivery limit should be set to unlimited (-1).

The RabbitMQ transport can verify and set the RabbitMQ delivery limit to unlimited (-1) using the management API. This requires the [rabbitmq management plugin](https://www.rabbitmq.com/docs/management#getting-started) to be enabled on the RabbitMQ node. The management API uses [basic access authentication](https://en.wikipedia.org/wiki/Basic_access_authentication) to connect.

To configure the management API and perform the delivery limit validation, set the URL details as follows:

snippet: rabbitmq-management-api-url

> [!NOTE]
> If the management API URL is not set, the transport will attempt to connect to the RabbitMQ management API with the default management API values based on the broker connection string. For example, if the broker connection string is `host=localhost;port=5671;useTls=true`, the management API would attempt to connect to `https://guest:guest@localhost:15671` .
### Handling existing delivery limit policies

If a queue already has a policy and the delivery limit is not set to unlimited (-1), the transport will raise an exception indicating that the RabbitMQ node already has a policy applied. The transport will not attempt to update the delivery limit of the existing policy or make a superseding delivery limit policy.

Manually updating the delivery limit of an existing policy to unlimited (-1) or removing the existing policy will allow the management API to validate the delivery limit. If it's not feasible to update the delivery limit of an existing policy, the use of the management API and the validation of the delivery limit can be disabled by calling the `DoNotUseManagementApi()` transport method.

> [!WARNING]
> Potential message loss can occur if the delivery limit is not set to unlimited on the queue. If the delivery limit is not set to unlimited, the recoverability process may not function as intended.
6 changes: 6 additions & 0 deletions transports/upgrades/rabbitmq-9to10.md
Original file line number Diff line number Diff line change
@@ -16,6 +16,12 @@ The transport now uses [RabbitMQ.Client v7.0.0](https://www.nuget.org/packages/R

For details, see the [RabbitMQ client changelog](https://github.com/rabbitmq/rabbitmq-dotnet-client/releases/tag/v7.0.0).

## RabbitMQ Management API

The transport has introduced a client connection to the HTTP-based RabbitMQ management API. This requires the [rabbitmq management plugin](https://www.rabbitmq.com/docs/management#getting-started) to be enabled on the node. The client uses [basic access authentication](https://en.wikipedia.org/wiki/Basic_access_authentication) to connect with the RabbitMQ management API and is configured via the `ManagementApiUrl` transport property.

For more details about the configuration, see [Configuring RabbitMQ delivery limit validation](/transports/rabbitmq/connection-settings.md?version=rabbit_10#configuring-rabbitmq-delivery-limit-validation).

### `IRoutingTopology` Updates

The following changes have been made to `IRoutingTopology`: