Skip to content

Commit

Permalink
rabbitmq: attempt fixed AlreadyClosedException
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeybannov committed Dec 27, 2023
1 parent 575e325 commit 5d96277
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 32 deletions.
63 changes: 35 additions & 28 deletions common/ASC.EventBus.RabbitMQ/EventBusRabbitMQ.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,37 +233,42 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventA
{
var eventName = eventArgs.RoutingKey;

if (!_subsManager.HasSubscriptionsForEvent(eventName))
var @event = GetEvent(eventName, eventArgs.Body.Span.ToArray());
var message = @event.ToString();

try
{
_logger.WarningNoSubscription(eventName);
if (!_subsManager.HasSubscriptionsForEvent(eventName))
{
_logger.WarningNoSubscription(eventName);

var messageId = Guid.Parse(eventArgs.BasicProperties.MessageId);
Guid.TryParse(eventArgs.BasicProperties?.MessageId, out var messageId);

if (_rejectedEvents.ContainsKey(messageId))
{
_rejectedEvents.TryRemove(messageId, out _);
_consumerChannel.BasicReject(eventArgs.DeliveryTag, requeue: false);
if (_rejectedEvents.ContainsKey(messageId) || messageId == default(Guid))
{
_rejectedEvents.TryRemove(messageId, out _);

_logger.DebugRejectEvent(eventName);
}
else
{
_rejectedEvents.TryAdd(messageId, eventArgs.Body.Span.ToArray());

// anti-pattern https://github.com/LeanKit-Labs/wascally/issues/36
_consumerChannel.BasicNack(eventArgs.DeliveryTag, multiple: false, requeue: true);
_logger.DebugBeforeRejectEvent(eventName, message);

_logger.DebugNackEvent(eventName);
}
_consumerChannel.BasicReject(eventArgs.DeliveryTag, requeue: false);

return;
}
_logger.DebugRejectEvent(eventName);
}
else
{
_rejectedEvents.TryAdd(messageId, eventArgs.Body.Span.ToArray());

var @event = GetEvent(eventName, eventArgs.Body.Span.ToArray());
var message = @event.ToString();
_logger.DebugBeforeNackEvent(eventName, message);

// anti-pattern https://github.com/LeanKit-Labs/wascally/issues/36
_consumerChannel.BasicNack(eventArgs.DeliveryTag, multiple: false, requeue: true);

_logger.DebugNackEvent(eventName);
}

return;
}

try
{
if (message.ToLowerInvariant().Contains("throw-fake-exception"))
{
throw new InvalidOperationException($"Fake exception requested: \"{message}\"");
Expand All @@ -273,9 +278,13 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventA

_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
}
catch (AlreadyClosedException ex)
{
_logger.ErrorProcessingMessage(message, ex);
}
catch (IntegrationEventRejectExeption ex)
{
_logger.WarningProcessingMessage(message, ex);
_logger.ErrorProcessingMessage(message, ex);

if (_rejectedEvents.ContainsKey(ex.EventId))
{
Expand All @@ -294,9 +303,7 @@ private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventA
}
catch (Exception ex)
{
_logger.WarningProcessingMessage(message, ex);

_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
_logger.ErrorProcessingMessage(message, ex);
}
}

Expand Down Expand Up @@ -365,7 +372,7 @@ private void PreProcessEvent(IntegrationEvent @event)
return;
}

if (_rejectedEvents.ContainsKey(@event.Id))
if (_rejectedEvents.ContainsKey(@event.Id))
{
@event.Redelivered = true;
}
Expand Down
14 changes: 10 additions & 4 deletions common/ASC.EventBus.RabbitMQ/Log/EventBusRabbitMQLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ internal static partial class EventBusRabbitMQLogger
[LoggerMessage(Level = LogLevel.Error, Message = "StartBasicConsume can't call on _consumerChannel == null")]
public static partial void ErrorStartBasicConsumeCantCall(this ILogger<EventBusRabbitMQ> logger);

[LoggerMessage(Level = LogLevel.Warning, Message = "----- ERROR Processing message \"{message}\"")]
public static partial void WarningProcessingMessage(this ILogger<EventBusRabbitMQ> logger, string message, Exception exception);
[LoggerMessage(Level = LogLevel.Error, Message = "----- ERROR Processing message \"{message}\"")]
public static partial void ErrorProcessingMessage(this ILogger<EventBusRabbitMQ> logger, string message, Exception exception);

[LoggerMessage(Level = LogLevel.Trace, Message = "Creating RabbitMQ consumer channel")]
public static partial void TraceCreatingConsumerChannel(this ILogger<EventBusRabbitMQ> logger);
Expand All @@ -72,9 +72,15 @@ internal static partial class EventBusRabbitMQLogger
[LoggerMessage(Level = LogLevel.Warning, Message = "No subscription for RabbitMQ event: {eventName}")]
public static partial void WarningNoSubscription(this ILogger<EventBusRabbitMQ> logger, string eventName);

[LoggerMessage(Level = LogLevel.Debug, Message = "Reject RabbitMQ event: {eventName}")]
[LoggerMessage(Level = LogLevel.Debug, Message = "RabbitMQ: Before reject event: {eventName}. Message {message}")]
public static partial void DebugBeforeRejectEvent(this ILogger<EventBusRabbitMQ> logger, string eventName, string message);

[LoggerMessage(Level = LogLevel.Debug, Message = "RabbitMQ: Before nack event: {eventName}. Message {message}")]
public static partial void DebugBeforeNackEvent(this ILogger<EventBusRabbitMQ> logger, string eventName, string message);

[LoggerMessage(Level = LogLevel.Debug, Message = "RabbitMQ: reject event: {eventName}")]
public static partial void DebugRejectEvent(this ILogger<EventBusRabbitMQ> logger, string eventName);

[LoggerMessage(Level = LogLevel.Debug, Message = "Nack RabbitMQ event: {eventName}")]
[LoggerMessage(Level = LogLevel.Debug, Message = "RabbitMQ: nack event: {eventName}")]
public static partial void DebugNackEvent(this ILogger<EventBusRabbitMQ> logger, string eventName);
}

0 comments on commit 5d96277

Please sign in to comment.