Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop #48

Merged
merged 5 commits into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions Source/Euonia.Bus.Abstract/Exceptions/MessageDeliverException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
namespace Nerosoft.Euonia.Bus;

/// <summary>
/// Represents errors that occur during message deliver.
/// </summary>
public class MessageDeliverException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="MessageDeliverException"/> class.
/// </summary>
public MessageDeliverException()
: base("Error occurred during message deliver.")
{
}

/// <summary>
/// Initializes a new instance of the <see cref="MessageDeliverException"/> class.
/// </summary>
/// <param name="message"></param>
public MessageDeliverException(string message)
: base(message)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="MessageDeliverException"/> class.
/// </summary>
/// <param name="message"></param>
/// <param name="innerException"></param>
public MessageDeliverException(string message, Exception innerException)
: base(message, innerException)
{
}
}
34 changes: 34 additions & 0 deletions Source/Euonia.Bus.Abstract/Exceptions/MessageTypeException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
namespace Nerosoft.Euonia.Bus;

/// <summary>
/// Represents errors that occur when the message type is invalid.
/// </summary>
public class MessageTypeException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="MessageTypeException"/> class.
/// </summary>
public MessageTypeException()
: base("The message type is invalid.")
{
}

/// <summary>
/// Initializes a new instance of the <see cref="MessageTypeException"/> class.
/// </summary>
/// <param name="message"></param>
public MessageTypeException(string message)
: base(message)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="MessageTypeException"/> class.
/// </summary>
/// <param name="message"></param>
/// <param name="innerException"></param>
public MessageTypeException(string message, Exception innerException)
: base(message, innerException)
{
}
}
2 changes: 1 addition & 1 deletion Source/Euonia.Bus.InMemory/InMemoryRecipientRegistrar.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public async Task RegisterAsync(IEnumerable<MessageRegistration> registrations,
}
else
{
throw new InvalidOperationException();
throw new MessageTypeException("The message type is neither a queue nor a topic.");
}
}

Expand Down
10 changes: 5 additions & 5 deletions Source/Euonia.Bus.InMemory/Messenger/StrongReferenceMessenger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ public TMessage Send<TMessage, TToken>(TMessage message, TToken token)
/// <param name="token">The token indicating what channel to use.</param>
/// <returns>The message that was sent (ie. <paramref name="message"/>).</returns>
/// <exception cref="System.ArgumentNullException">Thrown if <paramref name="message"/> or <paramref name="token"/> are <see langword="null"/>.</exception>
public TMessage UnsafeSend<TMessage, TToken>(TMessage message, TToken token)
internal TMessage UnsafeSend<TMessage, TToken>(TMessage message, TToken token)
where TMessage : class
where TToken : IEquatable<TToken>
{
Expand All @@ -624,15 +624,15 @@ public TMessage UnsafeSend<TMessage, TToken>(TMessage message, TToken token)
// Check whether there are any registered recipients
if (!TryGetMapping<TMessage>(out var mapping))
{
throw new InvalidOperationException("No recipients registered for the input message type.");
throw new MessageDeliverException("No recipients registered for the input message type.");
}

// Check the number of remaining handlers, see below
var totalHandlersCount = mapping.Count;

if (totalHandlersCount == 0)
{
throw new InvalidOperationException("No recipients registered for the input message type.");
throw new MessageDeliverException("No recipients registered for the input message type.");
}

pairs = rentedArray = ArrayPool<object>.Shared.Rent(2 * totalHandlersCount);
Expand Down Expand Up @@ -670,7 +670,7 @@ public TMessage UnsafeSend<TMessage, TToken>(TMessage message, TToken token)

if (totalHandlersCount == 0)
{
throw new InvalidOperationException("No recipients registered for the input message type.");
throw new MessageDeliverException("No recipients registered for the input message type.");
}

// Rent the array and also assign it to a span, which will be used to access values.
Expand Down Expand Up @@ -704,7 +704,7 @@ public TMessage UnsafeSend<TMessage, TToken>(TMessage message, TToken token)

if (i == 0)
{
throw new InvalidOperationException("No recipients registered for the input message type and token.");
throw new MessageDeliverException("No recipients registered for the input message type and token.");
}
}
}
Expand Down
62 changes: 28 additions & 34 deletions Source/Euonia.Bus.RabbitMq/RabbitMqDispatcher.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using Microsoft.Extensions.Logging;
using System.Net.Sockets;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Polly;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;

namespace Nerosoft.Euonia.Bus.RabbitMq;

Expand Down Expand Up @@ -45,7 +47,8 @@ public async Task PublishAsync<TMessage>(RoutedMessage<TMessage> message, Cancel
props.Headers[Constants.MessageHeaders.MessageType] = typeName;
props.Type = typeName;

await Policy.Handle<Exception>()
await Policy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(3), (exception, _, retryCount, _) =>
{
_logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
Expand All @@ -66,21 +69,9 @@ public async Task SendAsync<TMessage>(RoutedMessage<TMessage> message, Cancellat
{
using var channel = _connection.CreateChannel();

{
var queueDeclare = channel.DeclareQueuePassively($"{_options.QueueName}${message.Channel}$");
var requestQueueName = $"{_options.QueueName}${message.Channel}$";

if (queueDeclare == null)
{
throw new InvalidOperationException("Channel not found in vhost '/'.");
//channel.QueueDeclare($"{_options.QueueName}${message.Channel}$", true, false, false, null);
//channel.BasicQos(0, 1, false);
}

if (queueDeclare.ConsumerCount < 1)
{
throw new InvalidOperationException("No consumer found for the channel.");
}
}
CheckQueue(channel, requestQueueName);

var typeName = message.GetTypeName();

Expand All @@ -89,7 +80,8 @@ public async Task SendAsync<TMessage>(RoutedMessage<TMessage> message, Cancellat
props.Headers[Constants.MessageHeaders.MessageType] = typeName;
props.Type = typeName;

await Policy.Handle<Exception>()
await Policy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(3), (exception, _, retryCount, _) =>
{
_logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
Expand All @@ -98,7 +90,7 @@ await Policy.Handle<Exception>()
{
var messageBody = await SerializeAsync(message, cancellationToken);

channel.BasicPublish("", $"{_options.QueueName}${message.Channel}$", props, messageBody);
channel.BasicPublish("", requestQueueName, props, messageBody);

Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, null));
});
Expand All @@ -113,21 +105,7 @@ public async Task<TResponse> SendAsync<TMessage, TResponse>(RoutedMessage<TMessa

using var channel = _connection.CreateChannel();

{
var queueDeclare = channel.DeclareQueuePassively(requestQueueName);

if (queueDeclare == null)
{
throw new InvalidOperationException("Channel not found in vhost '/'.");
//channel.QueueDeclare($"{_options.QueueName}${message.Channel}$", true, false, false, null);
//channel.BasicQos(0, 1, false);
}

if(queueDeclare.ConsumerCount < 1)
{
throw new InvalidOperationException("No consumer found for the channel.");
}
}
CheckQueue(channel, requestQueueName);

var responseQueueName = channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(channel);
Expand All @@ -143,7 +121,8 @@ public async Task<TResponse> SendAsync<TMessage, TResponse>(RoutedMessage<TMessa
props.CorrelationId = message.CorrelationId;
props.ReplyTo = responseQueueName;

await Policy.Handle<Exception>()
await Policy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(1), (exception, _, retryCount, _) =>
{
_logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
Expand Down Expand Up @@ -175,6 +154,21 @@ void OnReceived(object sender, BasicDeliverEventArgs args)
}
}

private static void CheckQueue(IModel channel, string requestQueueName)
{
var queueDeclare = channel.DeclareQueuePassively(requestQueueName);

if (queueDeclare == null)
{
throw new MessageDeliverException("Channel not found in vhost '/'.");
}

if (queueDeclare.ConsumerCount < 1)
{
throw new MessageDeliverException("No consumer found for the channel.");
}
}

/// <summary>
/// Serializes the message to bytes.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Bus.RabbitMq/RabbitMqRecipientRegistrar.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public async Task RegisterAsync(IEnumerable<MessageRegistration> registrations,
}
else
{
throw new InvalidOperationException();
throw new InvalidOperationException("The message type is neither a queue nor a topic.");
}

recipient.Start(registration.Channel);
Expand Down
3 changes: 1 addition & 2 deletions Source/Euonia.Bus/BusConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class BusConfigurator : IBusConfigurator
{
private readonly List<MessageRegistration> _registrations = new();

private MessageConventionBuilder ConventionBuilder { get; } = new();
internal MessageConventionBuilder ConventionBuilder { get; } = new();

/// <summary>
/// Gets the message handle registrations.
Expand Down Expand Up @@ -166,7 +166,6 @@ public BusConfigurator RegisterHandlers(IEnumerable<Type> types)
public BusConfigurator SetConventions(Action<MessageConventionBuilder> configure)
{
configure?.Invoke(ConventionBuilder);
Service.TryAddSingleton<IMessageConvention>(ConventionBuilder.Convention);
return this;
}
}
6 changes: 3 additions & 3 deletions Source/Euonia.Bus/Core/ServiceBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public Task PublishAsync<TMessage>(TMessage message, PublishOptions options, Act

if (!_convention.IsTopicType(message.GetType()))
{
throw new InvalidOperationException("The message type is not an event type.");
throw new MessageTypeException("The message type is not an event type.");
}

var context = GetRequestContext();
Expand All @@ -66,7 +66,7 @@ public Task SendAsync<TMessage>(TMessage message, SendOptions options, Action<Me

if (!_convention.IsQueueType(message.GetType()))
{
throw new InvalidOperationException("The message type is not a queue type.");
throw new MessageTypeException("The message type is not a queue type.");
}

var context = GetRequestContext();
Expand All @@ -93,7 +93,7 @@ public Task<TResult> SendAsync<TMessage, TResult>(TMessage message, SendOptions

if (!_convention.IsQueueType(message.GetType()))
{
throw new InvalidOperationException("The message type is not a queue type.");
throw new MessageTypeException("The message type is not a queue type.");
}

var context = GetRequestContext();
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Bus/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static void AddServiceBus(this IServiceCollection services, Action<BusCon

return context;
});
services.TryAddSingleton<MessageConvention>();
services.TryAddSingleton<IMessageConvention>(configurator.ConventionBuilder.Convention);
services.AddSingleton<IBus, ServiceBus>();
services.AddHostedService<RecipientActivator>();
}
Expand Down
2 changes: 2 additions & 0 deletions Source/Euonia.Business/Rules/DataAnnotationRule.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.ComponentModel.DataAnnotations;

using ValidationResult = System.ComponentModel.DataAnnotations.ValidationResult;

namespace Nerosoft.Euonia.Business;

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/BadGatewayException.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur when a server acting as a gateway or proxy received an invalid response
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/BadRequestException.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur if request is bad.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/BusinessException.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur during business logic execution.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/ConfigurationException.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur during application configuration.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/ConflictException.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur if conflict.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/ExceptionPrompt.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Responsible for storing and returning exception prompts.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/ForbiddenException.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur if request is denied.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/GatewayTimeoutException.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur if gateway timeout.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/HttpStatusCodeAttribute.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace Nerosoft.Euonia.Core;
namespace System;

/// <inheritdoc />
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/HttpStatusException.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// Represents errors that occur when HTTP status code is not 200.
Expand Down
2 changes: 1 addition & 1 deletion Source/Euonia.Core/Exceptions/IExceptionPrompt.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Nerosoft.Euonia.Core;
namespace System;

/// <summary>
/// To be added.
Expand Down
Loading
Loading