Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump RabbitMQ.Client from 6.8.1 to 7.0.0 #1987

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<PackageVersion Include="MongoDB.Driver" Version="3.0.0" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
<PackageVersion Include="NLog" Version="5.3.4" />
<PackageVersion Include="RabbitMQ.Client" Version="6.8.1" />
<PackageVersion Include="RabbitMQ.Client" Version="7.0.0" />
<PackageVersion Include="Spectre.Console" Version="0.49.1" />
<PackageVersion Include="StackExchange.Redis" Version="2.8.22" />
<PackageVersion Include="System.IO.Abstractions" Version="21.1.3" />
Expand Down
2 changes: 1 addition & 1 deletion src/SmiServices/Common/IMessageBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface IMessageBroker

IProducerModel SetupProducer(ProducerOptions producerOptions, bool isBatch);

IModel GetModel(string connectionName);
IChannel GetModel(string connectionName);

void Shutdown(TimeSpan timeout);
public void Wait();
Expand Down
2 changes: 1 addition & 1 deletion src/SmiServices/Common/Messages/IMessageHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public interface IMessageHeader
/// </summary>
Guid[] Parents { get; }

void Populate(IDictionary<string, object> props);
void Populate(IDictionary<string, object?> props);
void Log(ILogger logger, LogLevel level, string message, Exception? ex = null);

bool IsDescendantOf(IMessageHeader other);
Expand Down
2 changes: 1 addition & 1 deletion src/SmiServices/Common/Messages/MessageHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static MessageHeader FromDict(IDictionary<string, object> encodedHeaders,
/// Populates RabbitMQ header properties with the current MessageHeader
/// </summary>
/// <param name="headers"></param>
public void Populate(IDictionary<string, object> headers)
public void Populate(IDictionary<string, object?> headers)
{
headers.Add("MessageGuid", MessageGuid.ToString());
headers.Add("ProducerProcessID", ProducerProcessID);
Expand Down
8 changes: 5 additions & 3 deletions src/SmiServices/Common/Messaging/BatchProducerModel.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using RabbitMQ.Client;
using SmiServices.Common.Messages;
using System;
using System.Threading.Tasks;

namespace SmiServices.Common.Messaging
{
Expand All @@ -13,7 +14,7 @@ public class BatchProducerModel : ProducerModel
{
public BatchProducerModel(
string exchangeName,
IModel model,
IChannel model,
IBasicProperties properties,
int maxPublishAttempts = 1,
IBackoffProvider? backoffProvider = null,
Expand All @@ -26,14 +27,15 @@ public BatchProducerModel(


/// <summary>
/// Sends a message but does not wait for the server to confirm the publish. Manually call ProducerModel.WaitForConfirms()
/// Sends a message but does not wait for the server to confirm the publish. Manually await the Task
/// to check all previously unacknowledged messages have been sent.
/// </summary>
/// <param name="message"></param>
/// <param name="inResponseTo"></param>
/// <param name="routingKey"></param>
/// <returns></returns>
public override IMessageHeader SendMessage(IMessage message, IMessageHeader? inResponseTo = null, string? routingKey = null)
public override Task<IMessageHeader> SendMessage(IMessage message, IMessageHeader? inResponseTo = null,
string? routingKey = null)
{
return SendMessageImpl(message, inResponseTo, routingKey);
}
Expand Down
4 changes: 2 additions & 2 deletions src/SmiServices/Common/Messaging/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class Consumer<TMessage> : IConsumer where TMessage : IMessage
private readonly object _oConsumeLock = new();
private bool _exiting;

protected IModel? Model;
protected IChannel? Model;

public virtual void Shutdown()
{
Expand All @@ -72,7 +72,7 @@ protected Consumer()
}


public void SetModel(IModel model)
public void SetModel(IChannel model)
{
if (model.IsClosed)
throw new ArgumentException("Model is closed");
Expand Down
51 changes: 30 additions & 21 deletions src/SmiServices/Common/Messaging/ControlMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;

namespace SmiServices.Common.Messaging
Expand All @@ -29,6 +30,8 @@ public class ControlMessageConsumer : Consumer<IMessage>
private readonly string _processId;
private readonly IConnection _connection;

private readonly IChannel _channel;

private const string ControlQueueBindingKey = "smi.control.all.*";


Expand All @@ -43,13 +46,24 @@ public ControlMessageConsumer(
ArgumentNullException.ThrowIfNull(controlExchangeName);
ArgumentNullException.ThrowIfNull(stopEvent);

_connection = rabbitOptions.Connection;
if (rabbitOptions.RabbitMqVirtualHost is null || rabbitOptions.RabbitMqUserName is null || rabbitOptions.RabbitMqPassword is null)
throw new InvalidOperationException("RabbitOptions must have all fields set");

_processName = processName.ToLower();
_processId = processId.ToString();

ControlConsumerOptions.QueueName = $"Control.{_processName}{_processId}";

SetupControlQueueForHost(controlExchangeName);
_channel = new ConnectionFactory
{
HostName = rabbitOptions.RabbitMqHostName,
VirtualHost = rabbitOptions.RabbitMqVirtualHost,
Port = rabbitOptions.RabbitMqHostPort,
UserName = rabbitOptions.RabbitMqUserName,
Password = rabbitOptions.RabbitMqPassword
}.CreateConnectionAsync(CancellationToken.None).Result.CreateChannelAsync(null, CancellationToken.None).Result;

SetupControlQueueForHost(controlExchangeName).Wait();

StopHost += () => stopEvent("Control message stop");
}
Expand Down Expand Up @@ -82,7 +96,7 @@ public override void ProcessMessage(BasicDeliverEventArgs e)
string action = split[^1];

// If action contains a numeric and it's not our PID, then ignore
if (action.Any(char.IsDigit) && !action.EndsWith(_processId))
if (action.Any(char.IsDigit) && !action.EndsWith(_processId, StringComparison.Ordinal))
return;

// Ignore any messages not meant for us
Expand All @@ -94,22 +108,15 @@ public override void ProcessMessage(BasicDeliverEventArgs e)

// Handle any general actions - just stop and ping for now

if (action.StartsWith("stop"))
if (action.StartsWith("stop", StringComparison.Ordinal))
{
if (StopHost == null)
{
// This should never really happen
Logger.Info("Received stop command but no stop event registered");
return;
}

Logger.Info("Stop request received, raising StopHost event");
Task.Run(() => StopHost.Invoke());

return;
}

if (action.StartsWith("ping"))
if (action.StartsWith("ping", StringComparison.Ordinal))
{
Logger.Info("Pong!");
return;
Expand Down Expand Up @@ -142,9 +149,9 @@ public override void ProcessMessage(BasicDeliverEventArgs e)
/// </summary>
public override void Shutdown()
{
using var model = _connection.CreateModel();
Logger.Debug($"Deleting control queue: {ControlConsumerOptions.QueueName}");
model.QueueDelete(ControlConsumerOptions.QueueName);
if (ControlConsumerOptions.QueueName != null)
_channel.QueueDeleteAsync(ControlConsumerOptions.QueueName).Wait();
}

// NOTE(rkm 2020-05-12) Not used in this implementation
Expand All @@ -158,12 +165,11 @@ public override void Shutdown()
/// The connection is disposed and StartConsumer(...) can then be called on the parent MessageBroker with ControlConsumerOptions
/// </summary>
/// <param name="controlExchangeName"></param>
private void SetupControlQueueForHost(string controlExchangeName)
private async Task SetupControlQueueForHost(string controlExchangeName)
{
using var model = _connection.CreateModel();
try
{
model.ExchangeDeclarePassive(controlExchangeName);
await _channel.ExchangeDeclarePassiveAsync(controlExchangeName, CancellationToken.None);
}
catch (OperationInterruptedException e)
{
Expand All @@ -176,17 +182,20 @@ private void SetupControlQueueForHost(string controlExchangeName)
// durable = false (queue will not persist over restarts of the RabbitMq server)
// exclusive = false (queue won't be deleted when THIS connection closes)
// autoDelete = true (queue will be deleted after a consumer connects and then disconnects)
model.QueueDeclare(ControlConsumerOptions.QueueName, durable: false, exclusive: false, autoDelete: true);
await _channel.QueueDeclareAsync(
ControlConsumerOptions.QueueName ??
throw new InvalidOperationException(nameof(ControlConsumerOptions.QueueName)), durable: false,
exclusive: false, autoDelete: true, cancellationToken: CancellationToken.None);

// Binding for any control requests, i.e. "stop"
Logger.Debug($"Creating binding {controlExchangeName}->{ControlConsumerOptions.QueueName} with key {ControlQueueBindingKey}");
model.QueueBind(ControlConsumerOptions.QueueName, controlExchangeName, ControlQueueBindingKey);
await _channel.QueueBindAsync(ControlConsumerOptions.QueueName, controlExchangeName, ControlQueueBindingKey);

// Specific microservice binding key, ignoring the id at the end of the process name
string bindingKey = $"smi.control.{_processName}.*";
var bindingKey = $"smi.control.{_processName}.*";

Logger.Debug($"Creating binding {controlExchangeName}->{ControlConsumerOptions.QueueName} with key {bindingKey}");
model.QueueBind(ControlConsumerOptions.QueueName, controlExchangeName, bindingKey);
await _channel.QueueBindAsync(ControlConsumerOptions.QueueName, controlExchangeName, bindingKey);
}

private static string? GetBodyFromArgs(BasicDeliverEventArgs e)
Expand Down
8 changes: 4 additions & 4 deletions src/SmiServices/Common/Messaging/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ namespace SmiServices.Common.Messaging
public interface IConsumer
{
/// <summary>
/// Set the <see cref="IModel"/> which messages will be processed with
/// Set the <see cref="IChannel"/> which messages will be processed with
/// </summary>
/// <param name="model"></param>
void SetModel(IModel model);
/// <param name="model"></param>
void SetModel(IChannel model);

/// <summary>
/// Process a message received by the adapter.
Expand All @@ -38,7 +38,7 @@ public interface IConsumer
bool HoldUnprocessableMessages { get; set; }

/// <summary>
/// The BasicQos value configured on the <see cref="IModel"/>
/// The BasicQos value configured on the <see cref="IChannel"/>
/// </summary>
int QoSPrefetchCount { get; set; }
}
Expand Down
3 changes: 2 additions & 1 deletion src/SmiServices/Common/Messaging/IProducerModel.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Threading.Tasks;
using SmiServices.Common.Events;
using SmiServices.Common.Messages;

Expand All @@ -15,7 +16,7 @@ public interface IProducerModel
/// <param name="message">Message object to serialise and send.</param>
/// <param name="isInResponseTo">If you are responding to a message, pass that messages header in here (otherwise pass null)</param>
/// <param name="routingKey">Routing key for the exchange to direct the message.</param>
IMessageHeader SendMessage(IMessage message, IMessageHeader? isInResponseTo, string? routingKey);
Task<IMessageHeader> SendMessage(IMessage message, IMessageHeader? isInResponseTo, string? routingKey);

/// <summary>
/// Waits until all sent messages are confirmed by RabbitMQ
Expand Down
Loading
Loading