Skip to content

Commit

Permalink
Merge pull request #47 from NerosoftDev/refactoring/service-bus
Browse files Browse the repository at this point in the history
Refactoring/service bus
  • Loading branch information
Codespilot authored Dec 9, 2023
2 parents 6a69c89 + b5c93be commit 95cc4af
Show file tree
Hide file tree
Showing 35 changed files with 293 additions and 310 deletions.
34 changes: 34 additions & 0 deletions Source/Euonia.Bus.Abstract/Exceptions/MessageDeliverException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
namespace Nerosoft.Euonia.Bus;

/// <summary>
/// Represents errors that occur during message deliver.
/// </summary>
public class MessageDeliverException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="MessageDeliverException"/> class.
/// </summary>
public MessageDeliverException()
: base("Error occurred during message deliver.")
{
}

/// <summary>
/// Initializes a new instance of the <see cref="MessageDeliverException"/> class.
/// </summary>
/// <param name="message"></param>
public MessageDeliverException(string message)
: base(message)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="MessageDeliverException"/> class.
/// </summary>
/// <param name="message"></param>
/// <param name="innerException"></param>
public MessageDeliverException(string message, Exception innerException)
: base(message, innerException)
{
}
}
34 changes: 34 additions & 0 deletions Source/Euonia.Bus.Abstract/Exceptions/MessageTypeException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
namespace Nerosoft.Euonia.Bus;

/// <summary>
/// Represents errors that occur when the message type is invalid.
/// </summary>
public class MessageTypeException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="MessageTypeException"/> class.
/// </summary>
public MessageTypeException()
: base("The message type is invalid.")
{
}

/// <summary>
/// Initializes a new instance of the <see cref="MessageTypeException"/> class.
/// </summary>
/// <param name="message"></param>
public MessageTypeException(string message)
: base(message)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="MessageTypeException"/> class.
/// </summary>
/// <param name="message"></param>
/// <param name="innerException"></param>
public MessageTypeException(string message, Exception innerException)
: base(message, innerException)
{
}
}
2 changes: 1 addition & 1 deletion Source/Euonia.Bus.InMemory/InMemoryRecipientRegistrar.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public async Task RegisterAsync(IEnumerable<MessageRegistration> registrations,
}
else
{
throw new InvalidOperationException();
throw new MessageTypeException("The message type is neither a queue nor a topic.");
}
}

Expand Down
10 changes: 5 additions & 5 deletions Source/Euonia.Bus.InMemory/Messenger/StrongReferenceMessenger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ public TMessage Send<TMessage, TToken>(TMessage message, TToken token)
/// <param name="token">The token indicating what channel to use.</param>
/// <returns>The message that was sent (ie. <paramref name="message"/>).</returns>
/// <exception cref="System.ArgumentNullException">Thrown if <paramref name="message"/> or <paramref name="token"/> are <see langword="null"/>.</exception>
public TMessage UnsafeSend<TMessage, TToken>(TMessage message, TToken token)
internal TMessage UnsafeSend<TMessage, TToken>(TMessage message, TToken token)
where TMessage : class
where TToken : IEquatable<TToken>
{
Expand All @@ -624,15 +624,15 @@ public TMessage UnsafeSend<TMessage, TToken>(TMessage message, TToken token)
// Check whether there are any registered recipients
if (!TryGetMapping<TMessage>(out var mapping))
{
throw new InvalidOperationException("No recipients registered for the input message type.");
throw new MessageDeliverException("No recipients registered for the input message type.");
}

// Check the number of remaining handlers, see below
var totalHandlersCount = mapping.Count;

if (totalHandlersCount == 0)
{
throw new InvalidOperationException("No recipients registered for the input message type.");
throw new MessageDeliverException("No recipients registered for the input message type.");
}

pairs = rentedArray = ArrayPool<object>.Shared.Rent(2 * totalHandlersCount);
Expand Down Expand Up @@ -670,7 +670,7 @@ public TMessage UnsafeSend<TMessage, TToken>(TMessage message, TToken token)

if (totalHandlersCount == 0)
{
throw new InvalidOperationException("No recipients registered for the input message type.");
throw new MessageDeliverException("No recipients registered for the input message type.");
}

// Rent the array and also assign it to a span, which will be used to access values.
Expand Down Expand Up @@ -704,7 +704,7 @@ public TMessage UnsafeSend<TMessage, TToken>(TMessage message, TToken token)

if (i == 0)
{
throw new InvalidOperationException("No recipients registered for the input message type and token.");
throw new MessageDeliverException("No recipients registered for the input message type and token.");
}
}
}
Expand Down
62 changes: 28 additions & 34 deletions Source/Euonia.Bus.RabbitMq/RabbitMqDispatcher.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using Microsoft.Extensions.Logging;
using System.Net.Sockets;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Polly;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;

namespace Nerosoft.Euonia.Bus.RabbitMq;

Expand Down Expand Up @@ -45,7 +47,8 @@ public async Task PublishAsync<TMessage>(RoutedMessage<TMessage> message, Cancel
props.Headers[Constants.MessageHeaders.MessageType] = typeName;
props.Type = typeName;

await Policy.Handle<Exception>()
await Policy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(3), (exception, _, retryCount, _) =>
{
_logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
Expand All @@ -66,21 +69,9 @@ public async Task SendAsync<TMessage>(RoutedMessage<TMessage> message, Cancellat
{
using var channel = _connection.CreateChannel();

{
var queueDeclare = channel.DeclareQueuePassively($"{_options.QueueName}${message.Channel}$");
var requestQueueName = $"{_options.QueueName}${message.Channel}$";

if (queueDeclare == null)
{
throw new InvalidOperationException("Channel not found in vhost '/'.");
//channel.QueueDeclare($"{_options.QueueName}${message.Channel}$", true, false, false, null);
//channel.BasicQos(0, 1, false);
}

if (queueDeclare.ConsumerCount < 1)
{
throw new InvalidOperationException("No consumer found for the channel.");
}
}
CheckQueue(channel, requestQueueName);

var typeName = message.GetTypeName();

Expand All @@ -89,7 +80,8 @@ public async Task SendAsync<TMessage>(RoutedMessage<TMessage> message, Cancellat
props.Headers[Constants.MessageHeaders.MessageType] = typeName;
props.Type = typeName;

await Policy.Handle<Exception>()
await Policy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(3), (exception, _, retryCount, _) =>
{
_logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
Expand All @@ -98,7 +90,7 @@ await Policy.Handle<Exception>()
{
var messageBody = await SerializeAsync(message, cancellationToken);

channel.BasicPublish("", $"{_options.QueueName}${message.Channel}$", props, messageBody);
channel.BasicPublish("", requestQueueName, props, messageBody);

Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, null));
});
Expand All @@ -113,21 +105,7 @@ public async Task<TResponse> SendAsync<TMessage, TResponse>(RoutedMessage<TMessa

using var channel = _connection.CreateChannel();

{
var queueDeclare = channel.DeclareQueuePassively(requestQueueName);

if (queueDeclare == null)
{
throw new InvalidOperationException("Channel not found in vhost '/'.");
//channel.QueueDeclare($"{_options.QueueName}${message.Channel}$", true, false, false, null);
//channel.BasicQos(0, 1, false);
}

if(queueDeclare.ConsumerCount < 1)
{
throw new InvalidOperationException("No consumer found for the channel.");
}
}
CheckQueue(channel, requestQueueName);

var responseQueueName = channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(channel);
Expand All @@ -143,7 +121,8 @@ public async Task<TResponse> SendAsync<TMessage, TResponse>(RoutedMessage<TMessa
props.CorrelationId = message.CorrelationId;
props.ReplyTo = responseQueueName;

await Policy.Handle<Exception>()
await Policy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(1), (exception, _, retryCount, _) =>
{
_logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
Expand Down Expand Up @@ -175,6 +154,21 @@ void OnReceived(object sender, BasicDeliverEventArgs args)
}
}

private static void CheckQueue(IModel channel, string requestQueueName)
{
var queueDeclare = channel.DeclareQueuePassively(requestQueueName);

if (queueDeclare == null)
{
throw new MessageDeliverException("Channel not found in vhost '/'.");
}

if (queueDeclare.ConsumerCount < 1)
{
throw new MessageDeliverException("No consumer found for the channel.");
}
}

/// <summary>
/// Serializes the message to bytes.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Bus.RabbitMq/RabbitMqRecipientRegistrar.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public async Task RegisterAsync(IEnumerable<MessageRegistration> registrations,
}
else
{
throw new InvalidOperationException();
throw new InvalidOperationException("The message type is neither a queue nor a topic.");
}

recipient.Start(registration.Channel);
Expand Down
3 changes: 1 addition & 2 deletions Source/Euonia.Bus/BusConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class BusConfigurator : IBusConfigurator
{
private readonly List<MessageRegistration> _registrations = new();

private MessageConventionBuilder ConventionBuilder { get; } = new();
internal MessageConventionBuilder ConventionBuilder { get; } = new();

/// <summary>
/// Gets the message handle registrations.
Expand Down Expand Up @@ -166,7 +166,6 @@ public BusConfigurator RegisterHandlers(IEnumerable<Type> types)
public BusConfigurator SetConventions(Action<MessageConventionBuilder> configure)
{
configure?.Invoke(ConventionBuilder);
Service.TryAddSingleton<IMessageConvention>(ConventionBuilder.Convention);
return this;
}
}
6 changes: 3 additions & 3 deletions Source/Euonia.Bus/Core/ServiceBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public Task PublishAsync<TMessage>(TMessage message, PublishOptions options, Act

if (!_convention.IsTopicType(message.GetType()))
{
throw new InvalidOperationException("The message type is not an event type.");
throw new MessageTypeException("The message type is not an event type.");
}

var context = GetRequestContext();
Expand All @@ -66,7 +66,7 @@ public Task SendAsync<TMessage>(TMessage message, SendOptions options, Action<Me

if (!_convention.IsQueueType(message.GetType()))
{
throw new InvalidOperationException("The message type is not a queue type.");
throw new MessageTypeException("The message type is not a queue type.");
}

var context = GetRequestContext();
Expand All @@ -93,7 +93,7 @@ public Task<TResult> SendAsync<TMessage, TResult>(TMessage message, SendOptions

if (!_convention.IsQueueType(message.GetType()))
{
throw new InvalidOperationException("The message type is not a queue type.");
throw new MessageTypeException("The message type is not a queue type.");
}

var context = GetRequestContext();
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Bus/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static void AddServiceBus(this IServiceCollection services, Action<BusCon

return context;
});
services.TryAddSingleton<MessageConvention>();
services.TryAddSingleton<IMessageConvention>(configurator.ConventionBuilder.Convention);
services.AddSingleton<IBus, ServiceBus>();
services.AddHostedService<RecipientActivator>();
}
Expand Down
2 changes: 2 additions & 0 deletions Source/Euonia.Business/Rules/DataAnnotationRule.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.ComponentModel.DataAnnotations;

using ValidationResult = System.ComponentModel.DataAnnotations.ValidationResult;

namespace Nerosoft.Euonia.Business;

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/BadGatewayException.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur when a server acting as a gateway or proxy received an invalid response
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/BadRequestException.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur if request is bad.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/BusinessException.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur during business logic execution.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/ConfigurationException.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur during application configuration.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/ConflictException.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur if conflict.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/ExceptionPrompt.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Responsible for storing and returning exception prompts.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/ForbiddenException.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur if request is denied.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/GatewayTimeoutException.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur if gateway timeout.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/HttpStatusCodeAttribute.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace Nerosoft.Euonia.Core;
namespace System;

/// <inheritdoc />
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/HttpStatusException.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur when HTTP status code is not 200.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/IExceptionPrompt.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// To be added.
Expand Down
Loading

0 comments on commit 95cc4af

Please sign in to comment.