Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
YarinOmesi committed Sep 2, 2023
1 parent ecead15 commit ad8c9fd
Show file tree
Hide file tree
Showing 21 changed files with 168 additions and 360 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace MassTransit
{
/// <summary>
/// Used to configure the binding
/// </summary>
public interface IRabbitMqBindingConfigurator
{
/// <summary>
/// A routing key for the exchange binding
/// </summary>
string RoutingKey { set; }

/// <summary>
/// Sets the binding argument, or removes it if value is null
/// </summary>
/// <param name="key"></param>
/// <param name="value"></param>
void SetBindingArgument(string key, object value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,8 @@
/// Used to configure the binding of an exchange (to either a queue or another exchange)
/// </summary>
public interface IRabbitMqExchangeBindingConfigurator :
IRabbitMqExchangeConfigurator
IRabbitMqExchangeConfigurator,
IRabbitMqBindingConfigurator
{
/// <summary>
/// A routing key for the exchange binding
/// </summary>
string RoutingKey { set; }

/// <summary>
/// Sets the binding argument, or removes it if value is null
/// </summary>
/// <param name="key"></param>
/// <param name="value"></param>
void SetBindingArgument(string key, object value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace MassTransit
{
public interface IRabbitMqExchangeQueueBindingConfigurator:
IRabbitMqQueueConfigurator,
IRabbitMqExchangeConfigurator,
IRabbitMqBindingConfigurator
{
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void Bind<T>(Action<IRabbitMqExchangeBindingConfigurator> callback = null)
/// <param name="exchangeName"></param>
/// <param name="queueName"></param>
/// <param name="configure"></param>
void BindDeadLetterQueue(string exchangeName, string queueName = null, Action<IRabbitMqQueueBindingConfigurator> configure = null);
void BindDeadLetterQueue(string exchangeName, string queueName = null, Action<IRabbitMqExchangeQueueBindingConfigurator> configure = null);

/// <summary>
/// Add middleware to the model pipe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ public interface IRabbitMqConsumeTopologyConfigurator :
/// <param name="exchangeName">The exchange name to bind</param>
/// <param name="queueName">The queue name to declare/bind to the exchange</param>
/// <param name="configure">The configuration callback</param>
void BindQueue(string exchangeName, string queueName, Action<IRabbitMqQueueBindingConfigurator> configure = null);
void BindQueue(string exchangeName, string queueName, Action<IRabbitMqExchangeQueueBindingConfigurator> configure = null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ public interface IRabbitMqMessagePublishTopologyConfigurator :
/// <param name="exchangeName"></param>
/// <param name="queueName"></param>
/// <param name="configure"></param>
void BindQueue(string exchangeName, string? queueName, Action<IRabbitMqQueueBindingConfigurator>? configure = null);
void BindQueue(string exchangeName, string? queueName, Action<IRabbitMqExchangeQueueBindingConfigurator>? configure = null);

/// <summary>
/// Bind an alternate exchange/queue for the published message type
/// </summary>
/// <param name="exchangeName"></param>
/// <param name="queueName"></param>
/// <param name="configure"></param>
void BindAlternateExchangeQueue(string exchangeName, string? queueName = null, Action<IRabbitMqQueueBindingConfigurator>? configure = null);
void BindAlternateExchangeQueue(string exchangeName, string? queueName = null, Action<IRabbitMqExchangeQueueBindingConfigurator>? configure = null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public interface IRabbitMqSendTopologyConfigurator :
ISendTopologyConfigurator,
IRabbitMqSendTopology
{
Action<IRabbitMqQueueBindingConfigurator> ConfigureErrorSettings { set; }
Action<IRabbitMqQueueBindingConfigurator> ConfigureDeadLetterSettings { set; }
Action<IRabbitMqExchangeQueueBindingConfigurator> ConfigureErrorSettings { set; }
Action<IRabbitMqExchangeQueueBindingConfigurator> ConfigureDeadLetterSettings { set; }
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using Topology;


public class ExchangeQueueBindingConfigurator
public class ExchangeQueueBindingConfigurator : IRabbitMqExchangeQueueBindingConfigurator, EntitySettings
{
public RabbitMqQueueConfigurator QueueConfigurator { get; }
public RabbitMqExchangeConfigurator ExchangeConfigurator { get; }
Expand Down Expand Up @@ -42,6 +42,63 @@ public void AutoDeleteAfter(TimeSpan duration)
QueueConfigurator.QueueExpiration = duration;
}

public bool Exclusive
{
get => QueueConfigurator.Exclusive;
set => QueueConfigurator.Exclusive = value;
}

public bool Lazy
{
set => QueueConfigurator.Lazy = value;
}

public TimeSpan? QueueExpiration
{
get => QueueConfigurator.QueueExpiration;
set => QueueConfigurator.QueueExpiration = value;
}

public bool SingleActiveConsumer
{
set => QueueConfigurator.SingleActiveConsumer = value;
}

public void SetQueueArgument(string key, object value) => QueueConfigurator.SetQueueArgument(key, value);
public void SetQueueArgument(string key, TimeSpan value) => QueueConfigurator.SetQueueArgument(key, value);
public void EnablePriority(byte maxPriority) => QueueConfigurator.EnablePriority(maxPriority);
public void SetQuorumQueue(int? replicationFactor = default) => QueueConfigurator.SetQuorumQueue(replicationFactor);

public bool Durable
{
get => QueueConfigurator.Durable && ExchangeConfigurator.Durable;
set => QueueConfigurator.Durable = ExchangeConfigurator.Durable = value;
}

public bool AutoDelete
{
get => QueueConfigurator.AutoDelete && ExchangeConfigurator.AutoDelete;
set => QueueConfigurator.AutoDelete = ExchangeConfigurator.AutoDelete = value;
}

public string ExchangeType
{
get => ExchangeConfigurator.ExchangeType;
set => ExchangeConfigurator.ExchangeType = value;
}

public void SetExchangeArgument(string key, object value) => ExchangeConfigurator.SetExchangeArgument(key, value);

public void SetExchangeArgument(string key, TimeSpan value) => ExchangeConfigurator.SetExchangeArgument(key, value);

public IDictionary<string, object> ExchangeArguments => ExchangeConfigurator.ExchangeArguments;

public string ExchangeName
{
get => ExchangeConfigurator.ExchangeName;
set => ExchangeConfigurator.ExchangeName = value;
}

private class MyQueueConfigurator : RabbitMqQueueConfigurator
{
public MyQueueConfigurator(string queueName, bool durable, bool autoDelete)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public RabbitMqBusFactoryConfigurator(IRabbitMqBusConfiguration busConfiguration
var exchangeType = busConfiguration.BusEndpointConfiguration.Topology.Consume.ExchangeTypeSelector.DefaultExchangeType;
_settings = new RabbitMqReceiveSettings(busConfiguration.BusEndpointConfiguration, queueName, exchangeType, false, true);

_settings.BindingConfigurator.AutoDeleteAfter(TimeSpan.FromMinutes(1));
_settings.AutoDeleteAfter(TimeSpan.FromMinutes(1));
}

public IReceiveEndpointConfiguration CreateBusEndpointConfiguration(Action<IReceiveEndpointConfigurator> configure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void Bind<T>(Action<IRabbitMqExchangeBindingConfigurator> callback)
_endpointConfiguration.Topology.Consume.GetMessageTopology<T>().Bind(callback);
}

public void BindDeadLetterQueue(string exchangeName, string queueName, Action<IRabbitMqQueueBindingConfigurator> configure)
public void BindDeadLetterQueue(string exchangeName, string queueName, Action<IRabbitMqExchangeQueueBindingConfigurator> configure)
{
_endpointConfiguration.Topology.Consume.BindQueue(exchangeName, queueName, configure);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,50 +1,52 @@
namespace MassTransit.RabbitMqTransport.Configuration
{
using System;
using System.Collections.Generic;
using RabbitMQ.Client;


public class RabbitMqReceiveSettings : BaseReceiveSettings
public class RabbitMqReceiveSettings : ExchangeQueueBindingConfigurator,
ReceiveSettings
{
internal ExchangeQueueBindingConfigurator BindingConfigurator { get; }

readonly IRabbitMqEndpointConfiguration _configuration;

public RabbitMqReceiveSettings(IRabbitMqEndpointConfiguration configuration, string name, string type, bool durable, bool autoDelete)
: this(configuration, new ExchangeQueueBindingConfigurator(name, name, type, durable, autoDelete))
: base(name, name, type, durable, autoDelete)
{
_configuration = configuration;
ConsumeArguments = new Dictionary<string, object>();
}

public RabbitMqReceiveSettings(IRabbitMqEndpointConfiguration configuration, ExchangeQueueBindingConfigurator configurator)
: base(configurator)
public string QueueName
{
_configuration = configuration;
BindingConfigurator = configurator;
ConsumeArguments = new Dictionary<string, object>();
get => QueueConfigurator.QueueName;
set => QueueConfigurator.QueueName = value;
}

internal RabbitMqQueueConfigurator QueueConfigurator => BindingConfigurator.QueueConfigurator;
internal RabbitMqExchangeConfigurator ExchangeConfigurator => BindingConfigurator.ExchangeConfigurator;
public IDictionary<string, object> QueueArguments => QueueConfigurator.QueueArguments;


public int ConsumerPriority
{
set => ConsumeArguments[Headers.XPriority] = value;
}

public override ushort PrefetchCount
public ushort PrefetchCount
{
get => (ushort)_configuration.Transport.PrefetchCount;
set => _configuration.Transport.Configurator.PrefetchCount = value;
}

public override bool PurgeOnStartup { get; set; }
public override bool ExclusiveConsumer { get; set; }
public override bool NoAck { get; set; }
public bool PurgeOnStartup { get; set; }
public bool ExclusiveConsumer { get; set; }
public bool NoAck { get; set; }

public bool BindQueue { get; set; } = true;

public override bool BindQueue { get; set; } = true;
public IDictionary<string, object> ConsumeArguments { get; }

public override IDictionary<string, object> ConsumeArguments { get; }
public string ConsumerTag { get; set; }

public override string ConsumerTag { get; set; }
public virtual Uri GetInputAddress(Uri hostAddress) => ExchangeConfigurator.GetEndpointAddress(hostAddress);
}
}
Loading

0 comments on commit ad8c9fd

Please sign in to comment.