From d38e5b88541e89d0883903f87945a178b5213614 Mon Sep 17 00:00:00 2001 From: Yarin Omesi Date: Sat, 2 Sep 2023 15:11:08 +0300 Subject: [PATCH] WIP --- .../IRabbitMqQueueConfigurator.cs | 3 +- .../IRabbitMqReceiveEndpointConfigurator.cs | 3 +- .../RabbitMqTransport/BaseEntitySettings.cs | 42 +++++++ .../RabbitMqTransport/BaseReceiveSettings.cs | 77 +++++++++++++ .../ExchangeQueueBindingConfigurator.cs | 75 +++++++++++++ .../Configuration/QueueBindingConfigurator.cs | 39 ------- .../RabbitMqBusFactoryConfigurator.cs | 18 +-- .../RabbitMqHostConfiguration.cs | 3 +- .../RabbitMqQueueConfigurator.cs | 16 +-- .../RabbitMqReceiveEndpointConfiguration.cs | 16 +-- .../Configuration/RabbitMqReceiveSettings.cs | 40 ++++--- ...angeToQueueBindingTopologySpecification.cs | 47 ++++++++ ...ueueBindingConsumeTopologySpecification.cs | 21 +--- ...ueueBindingPublishTopologySpecification.cs | 23 +--- .../Topology/RabbitMqConsumeTopology.cs | 2 +- .../Topology/RabbitMqDeadLetterSettings.cs | 42 +++++-- .../Topology/RabbitMqErrorSettings.cs | 41 +++++-- .../RabbitMqMessagePublishTopology.cs | 2 +- ...RabbitMqQueueBindingConfiguratorAdapter.cs | 105 ++++++++++++++++++ .../Topology/RabbitMqSendTopology.cs | 4 +- 20 files changed, 472 insertions(+), 147 deletions(-) create mode 100644 src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/BaseEntitySettings.cs create mode 100644 src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/BaseReceiveSettings.cs create mode 100644 src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/ExchangeQueueBindingConfigurator.cs delete mode 100644 src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/QueueBindingConfigurator.cs create mode 100644 src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/Topology/BaseExchangeToQueueBindingTopologySpecification.cs create mode 100644 src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqQueueBindingConfiguratorAdapter.cs diff --git a/src/Transports/MassTransit.RabbitMqTransport/Configuration/IRabbitMqQueueConfigurator.cs b/src/Transports/MassTransit.RabbitMqTransport/Configuration/IRabbitMqQueueConfigurator.cs index 5d369bb38b5..2487703acd6 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/Configuration/IRabbitMqQueueConfigurator.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/Configuration/IRabbitMqQueueConfigurator.cs @@ -6,8 +6,7 @@ /// /// Configures a queue/exchange pair in RabbitMQ /// - public interface IRabbitMqQueueConfigurator : - IRabbitMqExchangeConfigurator + public interface IRabbitMqQueueConfigurator { /// /// Specify that the queue is exclusive to this process and cannot be accessed by other processes diff --git a/src/Transports/MassTransit.RabbitMqTransport/Configuration/IRabbitMqReceiveEndpointConfigurator.cs b/src/Transports/MassTransit.RabbitMqTransport/Configuration/IRabbitMqReceiveEndpointConfigurator.cs index 61c1b5ff7e9..21feada17cd 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/Configuration/IRabbitMqReceiveEndpointConfigurator.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/Configuration/IRabbitMqReceiveEndpointConfigurator.cs @@ -9,7 +9,8 @@ /// public interface IRabbitMqReceiveEndpointConfigurator : IReceiveEndpointConfigurator, - IRabbitMqQueueEndpointConfigurator + IRabbitMqQueueEndpointConfigurator, + IRabbitMqExchangeConfigurator { /// /// If false, deploys only exchange, without queue diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/BaseEntitySettings.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/BaseEntitySettings.cs new file mode 100644 index 00000000000..716c8cf48a7 --- /dev/null +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/BaseEntitySettings.cs @@ -0,0 +1,42 @@ +namespace MassTransit.RabbitMqTransport +{ + using System.Collections.Generic; + using Configuration; + + + public abstract class BaseEntitySettings : EntitySettings + { + readonly RabbitMqExchangeConfigurator _exchangeConfigurator; + + protected BaseEntitySettings(RabbitMqExchangeConfigurator exchangeConfigurator) + { + _exchangeConfigurator = exchangeConfigurator; + } + + public virtual bool Durable + { + get => _exchangeConfigurator.Durable; + set => _exchangeConfigurator.Durable = value; + } + + public virtual bool AutoDelete + { + get => _exchangeConfigurator.AutoDelete; + set => _exchangeConfigurator.AutoDelete = value; + } + + public IDictionary ExchangeArguments => _exchangeConfigurator.ExchangeArguments; + + public string ExchangeName + { + get => _exchangeConfigurator.ExchangeName; + set => _exchangeConfigurator.ExchangeName = value; + } + + public string ExchangeType + { + get => _exchangeConfigurator.ExchangeType; + set => _exchangeConfigurator.ExchangeType = value; + } + } +} diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/BaseReceiveSettings.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/BaseReceiveSettings.cs new file mode 100644 index 00000000000..97854d58177 --- /dev/null +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/BaseReceiveSettings.cs @@ -0,0 +1,77 @@ +namespace MassTransit.RabbitMqTransport +{ + using System; + using System.Collections.Generic; + using Configuration; + + + public abstract class BaseReceiveSettings : BaseEntitySettings, ReceiveSettings + { + readonly ExchangeQueueBindingConfigurator _bindingConfigurator; + + protected BaseReceiveSettings(ExchangeQueueBindingConfigurator bindingConfigurator) + : base(bindingConfigurator.ExchangeConfigurator) + { + _bindingConfigurator = bindingConfigurator; + } + + RabbitMqQueueConfigurator QueueConfigurator => _bindingConfigurator.QueueConfigurator; + RabbitMqExchangeConfigurator ExchangeConfigurator => _bindingConfigurator.ExchangeConfigurator; + + public string QueueName + { + get => QueueConfigurator.QueueName; + set => QueueConfigurator.QueueName = value; + } + + public bool Exclusive + { + get => QueueConfigurator.Exclusive; + set => QueueConfigurator.Exclusive = value; + } + + public IDictionary QueueArguments => QueueConfigurator.QueueArguments; + + public TimeSpan? QueueExpiration + { + get => QueueConfigurator.QueueExpiration; + set => QueueConfigurator.QueueExpiration = value; + } + + public string RoutingKey + { + get => _bindingConfigurator.RoutingKey; + set => _bindingConfigurator.RoutingKey = value; + } + + public IDictionary BindingArguments => _bindingConfigurator.BindingArguments; + + public override bool Durable + { + get => ExchangeConfigurator.Durable && QueueConfigurator.Durable; + set => ExchangeConfigurator.Durable = QueueConfigurator.Durable = value; + } + + public override bool AutoDelete + { + get => ExchangeConfigurator.AutoDelete && QueueConfigurator.AutoDelete; + set => ExchangeConfigurator.AutoDelete = QueueConfigurator.AutoDelete = value; + } + + public abstract ushort PrefetchCount { get; set; } + + public abstract bool PurgeOnStartup { get; set; } + + public abstract IDictionary ConsumeArguments { get; } + + public abstract bool ExclusiveConsumer { get; set; } + public abstract bool BindQueue { get; set; } + public abstract bool NoAck { get; set; } + public abstract string ConsumerTag { get; set; } + + public virtual Uri GetInputAddress(Uri hostAddress) + { + return _bindingConfigurator.ExchangeConfigurator.GetEndpointAddress(hostAddress); + } + } +} diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/ExchangeQueueBindingConfigurator.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/ExchangeQueueBindingConfigurator.cs new file mode 100644 index 00000000000..dab81a52c91 --- /dev/null +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/ExchangeQueueBindingConfigurator.cs @@ -0,0 +1,75 @@ +namespace MassTransit.RabbitMqTransport.Configuration +{ + using System; + using System.Collections.Generic; + using Topology; + + + public class ExchangeQueueBindingConfigurator + { + public RabbitMqQueueConfigurator QueueConfigurator { get; } + public RabbitMqExchangeConfigurator ExchangeConfigurator { get; } + + public ExchangeQueueBindingConfigurator(string name, string exchangeType, bool durable, bool autoDelete) + { + QueueConfigurator = new MyQueueConfigurator(name, durable, autoDelete); + ExchangeConfigurator = new MyExchangeConfigurator(name, exchangeType, durable, autoDelete); + + BindingArguments = new Dictionary(); + RoutingKey = ""; + } + public ExchangeQueueBindingConfigurator(string exchangeName, string queueName, string exchangeType, bool durable, bool autoDelete) + { + QueueConfigurator = new MyQueueConfigurator(queueName, durable, autoDelete); + ExchangeConfigurator = new MyExchangeConfigurator(exchangeName, exchangeType, durable, autoDelete); + + BindingArguments = new Dictionary(); + RoutingKey = ""; + } + + public IDictionary BindingArguments { get; } + + public void SetBindingArgument(string key, object value) + { + if (key == null) + throw new ArgumentNullException(nameof(key)); + + if (value == null) + BindingArguments.Remove(key); + else + BindingArguments[key] = value; + } + + public string RoutingKey { get; set; } + + public void AutoDeleteAfter(TimeSpan duration) + { + QueueConfigurator.AutoDelete = true; + ExchangeConfigurator.AutoDelete = true; + + QueueConfigurator.QueueExpiration = duration; + } + + private class MyQueueConfigurator : RabbitMqQueueConfigurator + { + public MyQueueConfigurator(string queueName, bool durable, bool autoDelete) + : base(queueName, durable, autoDelete) + { + } + } + + + private class MyExchangeConfigurator : RabbitMqExchangeConfigurator + { + public MyExchangeConfigurator(string exchangeName, string exchangeType, bool durable = true, bool autoDelete = false) + : base(exchangeName, exchangeType, durable, autoDelete) + { + } + + public MyExchangeConfigurator(Exchange source) + : base(source) + { + } + } + } +} diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/QueueBindingConfigurator.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/QueueBindingConfigurator.cs deleted file mode 100644 index b6a3404681c..00000000000 --- a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/QueueBindingConfigurator.cs +++ /dev/null @@ -1,39 +0,0 @@ -namespace MassTransit.RabbitMqTransport.Configuration -{ - using System; - using System.Collections.Generic; - - - public class QueueBindingConfigurator : - RabbitMqQueueConfigurator, - IRabbitMqQueueBindingConfigurator - { - protected QueueBindingConfigurator(string queueName, string exchangeType, bool durable, bool autoDelete) - : base(queueName, exchangeType, durable, autoDelete) - { - BindingArguments = new Dictionary(); - RoutingKey = ""; - } - - public IDictionary BindingArguments { get; } - - public void SetBindingArgument(string key, object value) - { - if (key == null) - throw new ArgumentNullException(nameof(key)); - - if (value == null) - BindingArguments.Remove(key); - else - BindingArguments[key] = value; - } - - public string RoutingKey { get; set; } - - public void AutoDeleteAfter(TimeSpan duration) - { - AutoDelete = true; - QueueExpiration = duration; - } - } -} diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqBusFactoryConfigurator.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqBusFactoryConfigurator.cs index f9c0b9e7ecf..a954f92b83c 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqBusFactoryConfigurator.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqBusFactoryConfigurator.cs @@ -25,7 +25,7 @@ public RabbitMqBusFactoryConfigurator(IRabbitMqBusConfiguration busConfiguration var exchangeType = busConfiguration.BusEndpointConfiguration.Topology.Consume.ExchangeTypeSelector.DefaultExchangeType; _settings = new RabbitMqReceiveSettings(busConfiguration.BusEndpointConfiguration, queueName, exchangeType, false, true); - _settings.AutoDeleteAfter(TimeSpan.FromMinutes(1)); + _settings.BindingConfigurator.AutoDeleteAfter(TimeSpan.FromMinutes(1)); } public IReceiveEndpointConfiguration CreateBusEndpointConfiguration(Action configure) @@ -79,7 +79,7 @@ public bool ExclusiveConsumer public bool Lazy { - set => _settings.Lazy = value; + set => _settings.QueueConfigurator.Lazy = value; } public TimeSpan? QueueExpiration @@ -89,37 +89,37 @@ public TimeSpan? QueueExpiration public bool SingleActiveConsumer { - set => _settings.SingleActiveConsumer = value; + set => _settings.QueueConfigurator.SingleActiveConsumer = value; } public void SetQueueArgument(string key, object value) { - _settings.SetQueueArgument(key, value); + _settings.QueueConfigurator.SetQueueArgument(key, value); } public void SetQueueArgument(string key, TimeSpan value) { - _settings.SetQueueArgument(key, value); + _settings.QueueConfigurator.SetQueueArgument(key, value); } public void SetExchangeArgument(string key, object value) { - _settings.SetExchangeArgument(key, value); + _settings.ExchangeConfigurator.SetExchangeArgument(key, value); } public void SetExchangeArgument(string key, TimeSpan value) { - _settings.SetExchangeArgument(key, value); + _settings.ExchangeConfigurator.SetExchangeArgument(key, value); } public void EnablePriority(byte maxPriority) { - _settings.EnablePriority(maxPriority); + _settings.QueueConfigurator.EnablePriority(maxPriority); } public void SetQuorumQueue(int? replicationFactor = default) { - _settings.SetQuorumQueue(replicationFactor); + _settings.QueueConfigurator.SetQuorumQueue(replicationFactor); } public void Host(RabbitMqHostSettings settings) diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqHostConfiguration.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqHostConfiguration.cs index 1ea834efbc9..671d69637e7 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqHostConfiguration.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqHostConfiguration.cs @@ -22,8 +22,7 @@ public RabbitMqHostConfiguration(IRabbitMqBusConfiguration busConfiguration, IRa : base(busConfiguration) { _busConfiguration = busConfiguration; - _hostSettings = new ConfigurationHostSettings - { + _hostSettings = new ConfigurationHostSettings { Host = "localhost", VirtualHost = "/", Port = 5672, diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqQueueConfigurator.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqQueueConfigurator.cs index ccdf04e5a34..79e49e17fa8 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqQueueConfigurator.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqQueueConfigurator.cs @@ -7,13 +7,13 @@ namespace MassTransit.RabbitMqTransport.Configuration public class RabbitMqQueueConfigurator : - RabbitMqExchangeConfigurator, IRabbitMqQueueConfigurator, Queue { - protected RabbitMqQueueConfigurator(string queueName, string exchangeType, bool durable, bool autoDelete) - : base(queueName, exchangeType, durable, autoDelete) + protected RabbitMqQueueConfigurator(string queueName, bool durable, bool autoDelete) { + Durable = durable; + AutoDelete = autoDelete; QueueArguments = new Dictionary(); QueueName = queueName; @@ -75,6 +75,8 @@ public void EnablePriority(byte maxPriority) QueueArguments[Headers.XMaxPriority] = (int)maxPriority; } + public bool AutoDelete { get; set; } + public bool Exclusive { get; set; } public TimeSpan? QueueExpiration @@ -96,11 +98,9 @@ public TimeSpan? QueueExpiration } public string QueueName { get; set; } - public IDictionary QueueArguments { get; } - public override RabbitMqEndpointAddress GetEndpointAddress(Uri hostAddress) - { - return new RabbitMqEndpointAddress(hostAddress, ExchangeName ?? QueueName, ExchangeType, Durable, AutoDelete); - } + public bool Durable { get; set; } + + public IDictionary QueueArguments { get; } } } diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqReceiveEndpointConfiguration.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqReceiveEndpointConfiguration.cs index cbdb660063e..c3aaa72878b 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqReceiveEndpointConfiguration.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqReceiveEndpointConfiguration.cs @@ -163,7 +163,7 @@ public bool ExclusiveConsumer public bool Lazy { - set => _settings.Lazy = value; + set => _settings.QueueConfigurator.Lazy = value; } public bool BindQueue @@ -178,7 +178,7 @@ public TimeSpan? QueueExpiration public bool SingleActiveConsumer { - set => _settings.SingleActiveConsumer = value; + set => _settings.QueueConfigurator.SingleActiveConsumer = value; } public string DeadLetterExchange @@ -188,32 +188,32 @@ public string DeadLetterExchange public void SetQueueArgument(string key, object value) { - _settings.SetQueueArgument(key, value); + _settings.QueueConfigurator.SetQueueArgument(key, value); } public void SetQueueArgument(string key, TimeSpan value) { - _settings.SetQueueArgument(key, value); + _settings.QueueConfigurator.SetQueueArgument(key, value); } public void SetExchangeArgument(string key, object value) { - _settings.SetExchangeArgument(key, value); + _settings.ExchangeConfigurator.SetExchangeArgument(key, value); } public void SetExchangeArgument(string key, TimeSpan value) { - _settings.SetExchangeArgument(key, value); + _settings.ExchangeConfigurator.SetExchangeArgument(key, value); } public void EnablePriority(byte maxPriority) { - _settings.EnablePriority(maxPriority); + _settings.QueueConfigurator.EnablePriority(maxPriority); } public void SetQuorumQueue(int? replicationFactor = default) { - _settings.SetQuorumQueue(replicationFactor); + _settings.QueueConfigurator.SetQuorumQueue(replicationFactor); } public void Bind(string exchangeName, Action callback) diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqReceiveSettings.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqReceiveSettings.cs index 44cbc03d3f8..42a64842751 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqReceiveSettings.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqReceiveSettings.cs @@ -1,22 +1,31 @@ namespace MassTransit.RabbitMqTransport.Configuration { - using System; using System.Collections.Generic; using RabbitMQ.Client; - public class RabbitMqReceiveSettings : - QueueBindingConfigurator, - ReceiveSettings + public class RabbitMqReceiveSettings : BaseReceiveSettings { + internal ExchangeQueueBindingConfigurator BindingConfigurator { get; } + internal RabbitMqQueueConfigurator QueueConfigurator { get; } + internal RabbitMqExchangeConfigurator ExchangeConfigurator { get; } + readonly IRabbitMqEndpointConfiguration _configuration; public RabbitMqReceiveSettings(IRabbitMqEndpointConfiguration configuration, string name, string type, bool durable, bool autoDelete) - : base(name, type, durable, autoDelete) + : this(configuration, new ExchangeQueueBindingConfigurator(name, name, type, durable, autoDelete)) { - _configuration = configuration; + } + public RabbitMqReceiveSettings(IRabbitMqEndpointConfiguration configuration, ExchangeQueueBindingConfigurator configurator) + : base(configurator) + { + _configuration = configuration; + BindingConfigurator = configurator; ConsumeArguments = new Dictionary(); + + QueueConfigurator = BindingConfigurator.QueueConfigurator; + ExchangeConfigurator = BindingConfigurator.ExchangeConfigurator; } public int ConsumerPriority @@ -24,25 +33,20 @@ public int ConsumerPriority set => ConsumeArguments[Headers.XPriority] = value; } - public ushort PrefetchCount + public override ushort PrefetchCount { get => (ushort)_configuration.Transport.PrefetchCount; set => _configuration.Transport.Configurator.PrefetchCount = value; } - public bool PurgeOnStartup { get; set; } - public bool ExclusiveConsumer { get; set; } - public bool NoAck { get; set; } - - public bool BindQueue { get; set; } = true; + public override bool PurgeOnStartup { get; set; } + public override bool ExclusiveConsumer { get; set; } + public override bool NoAck { get; set; } - public IDictionary ConsumeArguments { get; } + public override bool BindQueue { get; set; } = true; - public string ConsumerTag { get; set; } + public override IDictionary ConsumeArguments { get; } - public Uri GetInputAddress(Uri hostAddress) - { - return GetEndpointAddress(hostAddress); - } + public override string ConsumerTag { get; set; } } } diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/Topology/BaseExchangeToQueueBindingTopologySpecification.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/Topology/BaseExchangeToQueueBindingTopologySpecification.cs new file mode 100644 index 00000000000..36ea0bebfc4 --- /dev/null +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/Topology/BaseExchangeToQueueBindingTopologySpecification.cs @@ -0,0 +1,47 @@ +namespace MassTransit.RabbitMqTransport.Configuration +{ + using System.Collections.Generic; + using Topology; + + + public abstract class BaseExchangeToQueueBindingTopologySpecification + { + readonly ExchangeQueueBindingConfigurator _bindingConfigurator; + + protected BaseExchangeToQueueBindingTopologySpecification(string exchangeName, string exchangeType, string queueName = null, bool durable = true, + bool autoDelete = false) + { + _bindingConfigurator = new ExchangeQueueBindingConfigurator(exchangeName, queueName ?? exchangeName, exchangeType, durable, autoDelete); + } + + public IRabbitMqQueueBindingConfigurator AsOldInterface() => _bindingConfigurator.Adapt(); + + public virtual IEnumerable Validate() + { + yield break; + } + + protected void ApplyTopologyToBuilder(IBrokerTopologyBuilder builder) + { + RabbitMqExchangeConfigurator exchangeConfigurator = _bindingConfigurator.ExchangeConfigurator; + + var exchangeHandle = builder.ExchangeDeclare( + exchangeConfigurator.ExchangeName, + exchangeConfigurator.ExchangeType, + exchangeConfigurator.Durable, + exchangeConfigurator.AutoDelete, + exchangeConfigurator.ExchangeArguments); + + RabbitMqQueueConfigurator queueConfigurator = _bindingConfigurator.QueueConfigurator; + + var queueHandle = builder.QueueDeclare( + queueConfigurator.QueueName, + queueConfigurator.Durable, + queueConfigurator.AutoDelete, + queueConfigurator.Exclusive, + queueConfigurator.QueueArguments); + + var bindingHandle = builder.QueueBind(exchangeHandle, queueHandle, _bindingConfigurator.RoutingKey, _bindingConfigurator.BindingArguments); + } + } +} diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/Topology/ExchangeToQueueBindingConsumeTopologySpecification.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/Topology/ExchangeToQueueBindingConsumeTopologySpecification.cs index 379663881c0..fa7ff859914 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/Topology/ExchangeToQueueBindingConsumeTopologySpecification.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/Topology/ExchangeToQueueBindingConsumeTopologySpecification.cs @@ -1,35 +1,20 @@ namespace MassTransit.RabbitMqTransport.Configuration { - using System.Collections.Generic; using Topology; /// /// Used to bind an exchange to the consuming queue's exchange /// - public class ExchangeToQueueBindingConsumeTopologySpecification : - QueueBindingConfigurator, + public class ExchangeToQueueBindingConsumeTopologySpecification : BaseExchangeToQueueBindingTopologySpecification, IRabbitMqConsumeTopologySpecification { public ExchangeToQueueBindingConsumeTopologySpecification(string exchangeName, string exchangeType, string queueName = null, bool durable = true, bool autoDelete = false) - : base(queueName ?? exchangeName, exchangeType, durable, autoDelete) + : base(exchangeName, exchangeType, queueName, durable, autoDelete) { - ExchangeName = exchangeName; } - public IEnumerable Validate() - { - yield break; - } - - public void Apply(IReceiveEndpointBrokerTopologyBuilder builder) - { - var exchangeHandle = builder.ExchangeDeclare(ExchangeName, ExchangeType, Durable, AutoDelete, ExchangeArguments); - - var queueHandle = builder.QueueDeclare(QueueName, Durable, AutoDelete, Exclusive, QueueArguments); - - var bindingHandle = builder.QueueBind(exchangeHandle, queueHandle, RoutingKey, BindingArguments); - } + public void Apply(IReceiveEndpointBrokerTopologyBuilder builder) => ApplyTopologyToBuilder(builder); } } diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/Topology/ExchangeToQueueBindingPublishTopologySpecification.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/Topology/ExchangeToQueueBindingPublishTopologySpecification.cs index 7fb453ec985..12ec7100fa4 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/Topology/ExchangeToQueueBindingPublishTopologySpecification.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/Topology/ExchangeToQueueBindingPublishTopologySpecification.cs @@ -7,29 +7,14 @@ namespace MassTransit.RabbitMqTransport.Configuration /// /// Used to declare an exchange and queue, and bind them together. /// - public class ExchangeToQueueBindingPublishTopologySpecification : - QueueBindingConfigurator, + public class ExchangeToQueueBindingPublishTopologySpecification : BaseExchangeToQueueBindingTopologySpecification, IRabbitMqPublishTopologySpecification { - public ExchangeToQueueBindingPublishTopologySpecification(string exchangeName, string exchangeType, string queueName = null, bool durable = true, - bool autoDelete = false) - : base(queueName ?? exchangeName, exchangeType, durable, autoDelete) + public ExchangeToQueueBindingPublishTopologySpecification(string exchangeName, string exchangeType, string queueName = null, bool durable = true, bool autoDelete = false) + : base(exchangeName, exchangeType, queueName, durable, autoDelete) { - ExchangeName = exchangeName; } - public IEnumerable Validate() - { - yield break; - } - - public void Apply(IPublishEndpointBrokerTopologyBuilder builder) - { - var exchangeHandle = builder.ExchangeDeclare(ExchangeName, ExchangeType, Durable, AutoDelete, ExchangeArguments); - - var queueHandle = builder.QueueDeclare(QueueName, Durable, AutoDelete, Exclusive, QueueArguments); - - var bindingHandle = builder.QueueBind(exchangeHandle, queueHandle, RoutingKey, BindingArguments); - } + public void Apply(IPublishEndpointBrokerTopologyBuilder builder) => ApplyTopologyToBuilder(builder); } } diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqConsumeTopology.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqConsumeTopology.cs index 7251a583bce..835a804e86e 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqConsumeTopology.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqConsumeTopology.cs @@ -75,7 +75,7 @@ public void BindQueue(string exchangeName, string queueName, Action argument in source.ExchangeArguments) - SetExchangeArgument(argument.Key, argument.Value); + _bindingConfigurator.ExchangeConfigurator.SetExchangeArgument(argument.Key, argument.Value); foreach (KeyValuePair argument in source.QueueArguments) - SetQueueArgument(argument.Key, argument.Value); + _bindingConfigurator.QueueConfigurator.SetQueueArgument(argument.Key, argument.Value); } + public RabbitMqDeadLetterSettings(ReceiveSettings source, string name) + : this(source, new ExchangeQueueBindingConfigurator(name, source.ExchangeType, source.Durable, source.AutoDelete)) + { + } + public IRabbitMqQueueBindingConfigurator AsOldInterface() => _bindingConfigurator.Adapt(); + public BrokerTopology GetBrokerTopology() { var builder = new PublishEndpointBrokerTopologyBuilder(); - builder.Exchange = builder.ExchangeDeclare(ExchangeName, ExchangeType, Durable, AutoDelete, ExchangeArguments); + RabbitMqExchangeConfigurator exchangeConfigurator = _bindingConfigurator.ExchangeConfigurator; + + builder.Exchange = builder.ExchangeDeclare( + exchangeConfigurator.ExchangeName, + exchangeConfigurator.ExchangeType, + exchangeConfigurator.Durable, + exchangeConfigurator.AutoDelete, + exchangeConfigurator.ExchangeArguments); + + RabbitMqQueueConfigurator queueConfigurator = _bindingConfigurator.QueueConfigurator; - var queue = builder.QueueDeclare(QueueName, Durable, !QueueExpiration.HasValue && AutoDelete, false, QueueArguments); + var queue = builder.QueueDeclare( + queueConfigurator.QueueName, + queueConfigurator.Durable, + !queueConfigurator.QueueExpiration.HasValue && queueConfigurator.AutoDelete, + false, + queueConfigurator.QueueArguments); - builder.QueueBind(builder.Exchange, queue, RoutingKey, BindingArguments); + builder.QueueBind(builder.Exchange, queue, _bindingConfigurator.RoutingKey, _bindingConfigurator.BindingArguments); return builder.BuildBrokerTopology(); } diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqErrorSettings.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqErrorSettings.cs index 4f87590ae28..a1eea4edb9f 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqErrorSettings.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqErrorSettings.cs @@ -4,31 +4,54 @@ using Configuration; - public class RabbitMqErrorSettings : - QueueBindingConfigurator, + public class RabbitMqErrorSettings : BaseEntitySettings, ErrorSettings { + readonly ExchangeQueueBindingConfigurator _bindingConfigurator; + public RabbitMqErrorSettings(ReceiveSettings source, string name) - : base(name, source.ExchangeType, source.Durable, source.AutoDelete) + : this(source, new ExchangeQueueBindingConfigurator(name, source.ExchangeType, source.Durable, source.AutoDelete)) + { + } + + public RabbitMqErrorSettings(ReceiveSettings source, ExchangeQueueBindingConfigurator configurator) + : base(configurator.ExchangeConfigurator) { - QueueName = name; + _bindingConfigurator = configurator; + foreach (KeyValuePair argument in source.ExchangeArguments) - SetExchangeArgument(argument.Key, argument.Value); + _bindingConfigurator.ExchangeConfigurator.SetExchangeArgument(argument.Key, argument.Value); foreach (KeyValuePair argument in source.QueueArguments) - SetQueueArgument(argument.Key, argument.Value); + _bindingConfigurator.QueueConfigurator.SetQueueArgument(argument.Key, argument.Value); } + public IRabbitMqQueueBindingConfigurator AsOldInterface() => _bindingConfigurator.Adapt(); + public BrokerTopology GetBrokerTopology() { var builder = new PublishEndpointBrokerTopologyBuilder(); - builder.Exchange = builder.ExchangeDeclare(ExchangeName, ExchangeType, Durable, AutoDelete, ExchangeArguments); + RabbitMqExchangeConfigurator exchangeConfigurator = _bindingConfigurator.ExchangeConfigurator; + + builder.Exchange = builder.ExchangeDeclare( + exchangeConfigurator.ExchangeName, + exchangeConfigurator.ExchangeType, + exchangeConfigurator.Durable, + exchangeConfigurator.AutoDelete, + exchangeConfigurator.ExchangeArguments); + + RabbitMqQueueConfigurator queueConfigurator = _bindingConfigurator.QueueConfigurator; - var queue = builder.QueueDeclare(QueueName, Durable, !QueueExpiration.HasValue && AutoDelete, false, QueueArguments); + var queue = builder.QueueDeclare( + queueConfigurator.QueueName, + queueConfigurator.Durable, + !queueConfigurator.QueueExpiration.HasValue && queueConfigurator.AutoDelete, + false, + queueConfigurator.QueueArguments); - builder.QueueBind(builder.Exchange, queue, RoutingKey, BindingArguments); + builder.QueueBind(builder.Exchange, queue, _bindingConfigurator.RoutingKey, _bindingConfigurator.BindingArguments); return builder.BuildBrokerTopology(); } diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqMessagePublishTopology.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqMessagePublishTopology.cs index d7a4ff0fe0f..e91ce7784c2 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqMessagePublishTopology.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqMessagePublishTopology.cs @@ -134,7 +134,7 @@ public void BindQueue(string exchangeName, string? queueName, Action _bindingConfigurator.QueueConfigurator.QueueArguments.TryGetValue(Headers.XQueueMode, out string value) && value == "lazy"; + set => _bindingConfigurator.QueueConfigurator.Lazy = value; + } + + public TimeSpan? QueueExpiration + { + get => _bindingConfigurator.QueueConfigurator.QueueExpiration; + set => _bindingConfigurator.QueueConfigurator.QueueExpiration = value; + } + + public bool SingleActiveConsumer + { + get => _bindingConfigurator.QueueConfigurator.QueueArguments.ContainsKey(Headers.XSingleActiveConsumer); + set => _bindingConfigurator.QueueConfigurator.SingleActiveConsumer = value; + } + + public string RoutingKey + { + get => _bindingConfigurator.RoutingKey; + set => _bindingConfigurator.RoutingKey = value; + } + + public virtual bool Durable + { + get => _bindingConfigurator.ExchangeConfigurator.Durable; + set => _bindingConfigurator.ExchangeConfigurator.Durable = value; + } + + public virtual bool AutoDelete + { + get => _bindingConfigurator.ExchangeConfigurator.AutoDelete; + set => _bindingConfigurator.ExchangeConfigurator.AutoDelete = value; + } + + public string ExchangeType + { + get => _bindingConfigurator.ExchangeConfigurator.ExchangeType; + set => _bindingConfigurator.ExchangeConfigurator.ExchangeType = value; + } + + public void SetQueueArgument(string key, object value) + { + _bindingConfigurator.QueueConfigurator.SetQueueArgument(key, value); + } + + public void SetQueueArgument(string key, TimeSpan value) + { + _bindingConfigurator.QueueConfigurator.SetQueueArgument(key, value); + } + + public void SetExchangeArgument(string key, object value) + { + _bindingConfigurator.ExchangeConfigurator.SetExchangeArgument(key, value); + } + + public void SetExchangeArgument(string key, TimeSpan value) + { + _bindingConfigurator.ExchangeConfigurator.SetExchangeArgument(key, value); + } + + public void EnablePriority(byte maxPriority) + { + _bindingConfigurator.QueueConfigurator.EnablePriority(maxPriority); + } + + public void SetQuorumQueue(int? replicationFactor = default) + { + _bindingConfigurator.QueueConfigurator.SetQuorumQueue(replicationFactor); + } + + public void SetBindingArgument(string key, object value) + { + _bindingConfigurator.SetBindingArgument(key, value); + } + } + + + public static class RabbitMqQueueBindingConfiguratorAdapterExtensions + { + public static IRabbitMqQueueBindingConfigurator Adapt(this ExchangeQueueBindingConfigurator configurator) + { + return new RabbitMqQueueBindingConfiguratorAdapter(configurator); + } + } +} diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqSendTopology.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqSendTopology.cs index 2b8153d1738..dd37fa3b86a 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqSendTopology.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqSendTopology.cs @@ -37,7 +37,7 @@ public ErrorSettings GetErrorSettings(ReceiveSettings settings) { var errorSettings = new RabbitMqErrorSettings(settings, ErrorQueueNameFormatter.FormatErrorQueueName(settings.ExchangeName)); - ConfigureErrorSettings?.Invoke(errorSettings); + ConfigureErrorSettings?.Invoke(errorSettings.AsOldInterface()); return errorSettings; } @@ -46,7 +46,7 @@ public DeadLetterSettings GetDeadLetterSettings(ReceiveSettings settings) { var deadLetterSetting = new RabbitMqDeadLetterSettings(settings, DeadLetterQueueNameFormatter.FormatDeadLetterQueueName(settings.ExchangeName)); - ConfigureDeadLetterSettings?.Invoke(deadLetterSetting); + ConfigureDeadLetterSettings?.Invoke(deadLetterSetting.AsOldInterface()); return deadLetterSetting; }