Skip to content

Commit

Permalink
Re-implement 'AutoClose' functionality
Browse files Browse the repository at this point in the history
It was removed when removing the IConnectionBroker implementations.
  • Loading branch information
par.dahlman committed Jan 16, 2016
1 parent d1e64d0 commit 6e36f85
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions src/RawRabbit/Common/ChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class ChannelFactory : IChannelFactory
private ThreadLocal<IModel> _threadChannels;
private IConnection _connection;
private readonly ILogger _logger = LogManager.GetLogger<ChannelFactory>();
private readonly List<string> _hosts;
private RawRabbitConfiguration _config;

public ChannelFactory(RawRabbitConfiguration config, IClientPropertyProvider propsProvider)
{
Expand All @@ -31,13 +31,13 @@ public ChannelFactory(RawRabbitConfiguration config, IClientPropertyProvider pro
NetworkRecoveryInterval = config.RecoveryInterval,
ClientProperties = propsProvider.GetClientProperties(config)
};
_hosts = config.Hostnames;
_config = config;
_threadChannels = new ThreadLocal<IModel>(true);

try
{
_logger.LogDebug("Connecting to primary host.");
_connection = _connectionFactory.CreateConnection(_hosts);
_connection = _connectionFactory.CreateConnection(_config.Hostnames);
_logger.LogInformation($"Successfully established connection.");
}
catch (BrokerUnreachableException e)
Expand Down Expand Up @@ -82,6 +82,11 @@ private Task<IModel> GetOrCreateChannelAsync(IConnection connection)
{
_logger.LogInformation($"Creating a new channel for thread with id '{Thread.CurrentThread.ManagedThreadId}'");
_threadChannels.Value = connection.CreateModel();
if (_config.AutoCloseConnection && !connection.AutoClose)
{
_logger.LogInformation($"Setting AutoClose to true for current connection");
connection.AutoClose = _config.AutoCloseConnection;
}
return Task.FromResult(_threadChannels.Value);
}
if (_threadChannels.Value.IsOpen)
Expand Down Expand Up @@ -113,8 +118,8 @@ private Task<IConnection> GetConnectionAsync()
{
if (_connection == null)
{
_logger.LogDebug($"Creating a new connection for {_hosts.Count} hosts.");
_connection = _connectionFactory.CreateConnection(_hosts);
_logger.LogDebug($"Creating a new connection for {_config.Hostnames.Count} hosts.");
_connection = _connectionFactory.CreateConnection(_config.Hostnames);
}
if (_connection.IsOpen)
{
Expand All @@ -130,15 +135,15 @@ private Task<IConnection> GetConnectionAsync()
_connection.Dispose();
try
{
_connection = _connectionFactory.CreateConnection(_hosts);
_connection = _connectionFactory.CreateConnection(_config.Hostnames);
return Task.FromResult(_connection);
}
catch (BrokerUnreachableException)
{
_logger.LogInformation("None of the hosts are reachable. Waiting five seconds and try again.");
return Task
.Delay(TimeSpan.FromSeconds(5))
.ContinueWith(t => _connectionFactory.CreateConnection(_hosts));
.ContinueWith(t => _connectionFactory.CreateConnection(_config.Hostnames));
}
}

Expand Down

0 comments on commit 6e36f85

Please sign in to comment.