Skip to content

Commit

Permalink
Wire up 'autoclose' in ChannelFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
par.dahlman committed Dec 11, 2015
1 parent 759e81e commit b007417
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public LostConnectionTests()
public async Task Should_Succeed_To_Send_Response_Even_If_Channel_Is_Closed()
{
/* Setup */
var channelFactory = new ChannelFactory(new SingleNodeBroker(BrokerConfiguration.Local));
var channelFactory = new ChannelFactory(new SingleNodeBroker(BrokerConfiguration.Local), new RawRabbitConfiguration());
var expectedResponse = new FirstResponse {Infered = Guid.NewGuid()};
var reqeuster = BusClientFactory.CreateDefault(TimeSpan.FromHours(1));
var responder = BusClientFactory.CreateDefault(s => s
Expand Down
35 changes: 21 additions & 14 deletions src/RawRabbit/Common/ChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading;
using RabbitMQ.Client;
using RawRabbit.Configuration;
using RawRabbit.Logging;

namespace RawRabbit.Common
{
Expand All @@ -18,20 +19,21 @@ public interface IChannelFactory : IDisposable
/// in closing and disposing.
/// </summary>
/// <returns>A new instance of an IModel</returns>
IModel CreateChannel();
IModel CreateChannel(IConnection connection = null);
}

public class ChannelFactory : IChannelFactory
{
private readonly IConnectionBroker _connectionBroker;
private readonly ConcurrentDictionary<IConnection, ThreadLocal<IModel>> _connectionToChannel;
private readonly bool _autoDelete;
private readonly bool _autoClose;
private readonly ILogger _logger = LogManager.GetLogger<ChannelFactory>();

public ChannelFactory(IConnectionBroker connectionBroker, RawRabbitConfiguration config)
{
_connectionBroker = connectionBroker;
_connectionToChannel = new ConcurrentDictionary<IConnection, ThreadLocal<IModel>>();
_autoDelete = config.AutoDeleteConnection;
_autoClose = config.AutoCloseConnection;
}

public void Dispose()
Expand All @@ -41,6 +43,7 @@ public void Dispose()

public void CloseAll()
{
_logger.LogDebug("Trying to close all connections.");
foreach (var connection in _connectionToChannel.Keys)
{
connection?.Close();
Expand All @@ -50,37 +53,41 @@ public void CloseAll()
public IModel GetChannel()
{
var currentConnection = _connectionBroker.GetConnection();

if (!_connectionToChannel.ContainsKey(currentConnection))
{
_connectionToChannel.TryAdd(currentConnection, new ThreadLocal<IModel>(currentConnection.CreateModel));
_connectionToChannel.TryAdd(currentConnection, new ThreadLocal<IModel>());
var newChannel = CreateChannel(currentConnection);
_connectionToChannel[currentConnection].Value = newChannel;
}

var threadChannel = _connectionToChannel[currentConnection];
if (threadChannel.Value.IsOpen)
{
return threadChannel.Value;
}

var channel = currentConnection.CreateModel();
if (_autoDelete && !currentConnection.AutoClose)
{
currentConnection.AutoClose = true;
}
var channel = CreateChannel(currentConnection);

threadChannel.Value?.Dispose();
threadChannel.Value = channel;

return threadChannel.Value;
}

public IModel CreateChannel()
public IModel CreateChannel(IConnection connection = null)
{
var connection = _connectionBroker.GetConnection();
connection = connection ?? _connectionBroker.GetConnection();
var channel = connection.CreateModel();
if (_autoDelete && !connection.AutoClose)
if (_autoClose && !connection.AutoClose)
{
_logger.LogInformation("Setting AutoClose to true for connection while calling 'CreateChannel'.");
connection.AutoClose = true;
}
else
{
_logger.LogDebug($"AutoClose in settings object is set to: '{_autoClose}' and on connection '{connection.AutoClose}'");
}
return channel;
}
}
Expand Down

0 comments on commit b007417

Please sign in to comment.