Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
YarinOmesi committed Sep 2, 2023
1 parent 50d3e34 commit d38e5b8
Show file tree
Hide file tree
Showing 20 changed files with 472 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
/// <summary>
/// Configures a queue/exchange pair in RabbitMQ
/// </summary>
public interface IRabbitMqQueueConfigurator :
IRabbitMqExchangeConfigurator
public interface IRabbitMqQueueConfigurator
{
/// <summary>
/// Specify that the queue is exclusive to this process and cannot be accessed by other processes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
/// </summary>
public interface IRabbitMqReceiveEndpointConfigurator :
IReceiveEndpointConfigurator,
IRabbitMqQueueEndpointConfigurator
IRabbitMqQueueEndpointConfigurator,
IRabbitMqExchangeConfigurator
{
/// <summary>
/// If false, deploys only exchange, without queue
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
namespace MassTransit.RabbitMqTransport
{
using System.Collections.Generic;
using Configuration;


public abstract class BaseEntitySettings : EntitySettings
{
readonly RabbitMqExchangeConfigurator _exchangeConfigurator;

protected BaseEntitySettings(RabbitMqExchangeConfigurator exchangeConfigurator)
{
_exchangeConfigurator = exchangeConfigurator;
}

public virtual bool Durable
{
get => _exchangeConfigurator.Durable;
set => _exchangeConfigurator.Durable = value;
}

public virtual bool AutoDelete
{
get => _exchangeConfigurator.AutoDelete;
set => _exchangeConfigurator.AutoDelete = value;
}

public IDictionary<string, object> ExchangeArguments => _exchangeConfigurator.ExchangeArguments;

public string ExchangeName
{
get => _exchangeConfigurator.ExchangeName;
set => _exchangeConfigurator.ExchangeName = value;
}

public string ExchangeType
{
get => _exchangeConfigurator.ExchangeType;
set => _exchangeConfigurator.ExchangeType = value;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
namespace MassTransit.RabbitMqTransport
{
using System;
using System.Collections.Generic;
using Configuration;


public abstract class BaseReceiveSettings : BaseEntitySettings, ReceiveSettings
{
readonly ExchangeQueueBindingConfigurator _bindingConfigurator;

protected BaseReceiveSettings(ExchangeQueueBindingConfigurator bindingConfigurator)
: base(bindingConfigurator.ExchangeConfigurator)
{
_bindingConfigurator = bindingConfigurator;
}

RabbitMqQueueConfigurator QueueConfigurator => _bindingConfigurator.QueueConfigurator;
RabbitMqExchangeConfigurator ExchangeConfigurator => _bindingConfigurator.ExchangeConfigurator;

public string QueueName
{
get => QueueConfigurator.QueueName;
set => QueueConfigurator.QueueName = value;
}

public bool Exclusive
{
get => QueueConfigurator.Exclusive;
set => QueueConfigurator.Exclusive = value;
}

public IDictionary<string, object> QueueArguments => QueueConfigurator.QueueArguments;

public TimeSpan? QueueExpiration
{
get => QueueConfigurator.QueueExpiration;
set => QueueConfigurator.QueueExpiration = value;
}

public string RoutingKey
{
get => _bindingConfigurator.RoutingKey;
set => _bindingConfigurator.RoutingKey = value;
}

public IDictionary<string, object> BindingArguments => _bindingConfigurator.BindingArguments;

public override bool Durable
{
get => ExchangeConfigurator.Durable && QueueConfigurator.Durable;
set => ExchangeConfigurator.Durable = QueueConfigurator.Durable = value;
}

public override bool AutoDelete
{
get => ExchangeConfigurator.AutoDelete && QueueConfigurator.AutoDelete;
set => ExchangeConfigurator.AutoDelete = QueueConfigurator.AutoDelete = value;
}

public abstract ushort PrefetchCount { get; set; }

public abstract bool PurgeOnStartup { get; set; }

public abstract IDictionary<string, object> ConsumeArguments { get; }

public abstract bool ExclusiveConsumer { get; set; }
public abstract bool BindQueue { get; set; }
public abstract bool NoAck { get; set; }
public abstract string ConsumerTag { get; set; }

public virtual Uri GetInputAddress(Uri hostAddress)
{
return _bindingConfigurator.ExchangeConfigurator.GetEndpointAddress(hostAddress);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
namespace MassTransit.RabbitMqTransport.Configuration
{
using System;
using System.Collections.Generic;
using Topology;


public class ExchangeQueueBindingConfigurator
{
public RabbitMqQueueConfigurator QueueConfigurator { get; }
public RabbitMqExchangeConfigurator ExchangeConfigurator { get; }

public ExchangeQueueBindingConfigurator(string name, string exchangeType, bool durable, bool autoDelete)
{
QueueConfigurator = new MyQueueConfigurator(name, durable, autoDelete);
ExchangeConfigurator = new MyExchangeConfigurator(name, exchangeType, durable, autoDelete);

BindingArguments = new Dictionary<string, object>();
RoutingKey = "";
}
public ExchangeQueueBindingConfigurator(string exchangeName, string queueName, string exchangeType, bool durable, bool autoDelete)
{
QueueConfigurator = new MyQueueConfigurator(queueName, durable, autoDelete);
ExchangeConfigurator = new MyExchangeConfigurator(exchangeName, exchangeType, durable, autoDelete);

BindingArguments = new Dictionary<string, object>();
RoutingKey = "";
}

public IDictionary<string, object> BindingArguments { get; }

public void SetBindingArgument(string key, object value)
{
if (key == null)
throw new ArgumentNullException(nameof(key));

if (value == null)
BindingArguments.Remove(key);
else
BindingArguments[key] = value;
}

public string RoutingKey { get; set; }

public void AutoDeleteAfter(TimeSpan duration)
{
QueueConfigurator.AutoDelete = true;
ExchangeConfigurator.AutoDelete = true;

QueueConfigurator.QueueExpiration = duration;
}

private class MyQueueConfigurator : RabbitMqQueueConfigurator
{
public MyQueueConfigurator(string queueName, bool durable, bool autoDelete)
: base(queueName, durable, autoDelete)
{
}
}


private class MyExchangeConfigurator : RabbitMqExchangeConfigurator
{
public MyExchangeConfigurator(string exchangeName, string exchangeType, bool durable = true, bool autoDelete = false)
: base(exchangeName, exchangeType, durable, autoDelete)
{
}

public MyExchangeConfigurator(Exchange source)
: base(source)
{
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public RabbitMqBusFactoryConfigurator(IRabbitMqBusConfiguration busConfiguration
var exchangeType = busConfiguration.BusEndpointConfiguration.Topology.Consume.ExchangeTypeSelector.DefaultExchangeType;
_settings = new RabbitMqReceiveSettings(busConfiguration.BusEndpointConfiguration, queueName, exchangeType, false, true);

_settings.AutoDeleteAfter(TimeSpan.FromMinutes(1));
_settings.BindingConfigurator.AutoDeleteAfter(TimeSpan.FromMinutes(1));
}

public IReceiveEndpointConfiguration CreateBusEndpointConfiguration(Action<IReceiveEndpointConfigurator> configure)
Expand Down Expand Up @@ -79,7 +79,7 @@ public bool ExclusiveConsumer

public bool Lazy
{
set => _settings.Lazy = value;
set => _settings.QueueConfigurator.Lazy = value;
}

public TimeSpan? QueueExpiration
Expand All @@ -89,37 +89,37 @@ public TimeSpan? QueueExpiration

public bool SingleActiveConsumer
{
set => _settings.SingleActiveConsumer = value;
set => _settings.QueueConfigurator.SingleActiveConsumer = value;
}

public void SetQueueArgument(string key, object value)
{
_settings.SetQueueArgument(key, value);
_settings.QueueConfigurator.SetQueueArgument(key, value);
}

public void SetQueueArgument(string key, TimeSpan value)
{
_settings.SetQueueArgument(key, value);
_settings.QueueConfigurator.SetQueueArgument(key, value);
}

public void SetExchangeArgument(string key, object value)
{
_settings.SetExchangeArgument(key, value);
_settings.ExchangeConfigurator.SetExchangeArgument(key, value);
}

public void SetExchangeArgument(string key, TimeSpan value)
{
_settings.SetExchangeArgument(key, value);
_settings.ExchangeConfigurator.SetExchangeArgument(key, value);
}

public void EnablePriority(byte maxPriority)
{
_settings.EnablePriority(maxPriority);
_settings.QueueConfigurator.EnablePriority(maxPriority);
}

public void SetQuorumQueue(int? replicationFactor = default)
{
_settings.SetQuorumQueue(replicationFactor);
_settings.QueueConfigurator.SetQuorumQueue(replicationFactor);
}

public void Host(RabbitMqHostSettings settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ public RabbitMqHostConfiguration(IRabbitMqBusConfiguration busConfiguration, IRa
: base(busConfiguration)
{
_busConfiguration = busConfiguration;
_hostSettings = new ConfigurationHostSettings
{
_hostSettings = new ConfigurationHostSettings {
Host = "localhost",
VirtualHost = "/",
Port = 5672,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ namespace MassTransit.RabbitMqTransport.Configuration


public class RabbitMqQueueConfigurator :
RabbitMqExchangeConfigurator,
IRabbitMqQueueConfigurator,
Queue
{
protected RabbitMqQueueConfigurator(string queueName, string exchangeType, bool durable, bool autoDelete)
: base(queueName, exchangeType, durable, autoDelete)
protected RabbitMqQueueConfigurator(string queueName, bool durable, bool autoDelete)
{
Durable = durable;
AutoDelete = autoDelete;
QueueArguments = new Dictionary<string, object>();

QueueName = queueName;
Expand Down Expand Up @@ -75,6 +75,8 @@ public void EnablePriority(byte maxPriority)
QueueArguments[Headers.XMaxPriority] = (int)maxPriority;
}

public bool AutoDelete { get; set; }

public bool Exclusive { get; set; }

public TimeSpan? QueueExpiration
Expand All @@ -96,11 +98,9 @@ public TimeSpan? QueueExpiration
}

public string QueueName { get; set; }
public IDictionary<string, object> QueueArguments { get; }

public override RabbitMqEndpointAddress GetEndpointAddress(Uri hostAddress)
{
return new RabbitMqEndpointAddress(hostAddress, ExchangeName ?? QueueName, ExchangeType, Durable, AutoDelete);
}
public bool Durable { get; set; }

public IDictionary<string, object> QueueArguments { get; }
}
}
Loading

0 comments on commit d38e5b8

Please sign in to comment.