diff --git a/Source/Euonia.Bus.Abstract/MessageContext.cs b/Source/Euonia.Bus.Abstract/MessageContext.cs
index 3028caa..4fece29 100644
--- a/Source/Euonia.Bus.Abstract/MessageContext.cs
+++ b/Source/Euonia.Bus.Abstract/MessageContext.cs
@@ -44,7 +44,7 @@ public MessageContext(IRoutedMessage pack)
///
/// Invoked while message was handled and replied to dispatcher.
///
- public event EventHandler OnResponse
+ public event EventHandler Responded
{
add => _events.AddEventHandler(value);
remove => _events.RemoveEventHandler(value);
@@ -59,6 +59,15 @@ public event EventHandler Completed
remove => _events.RemoveEventHandler(value);
}
+ ///
+ /// Invoked while message handling was failed.
+ ///
+ public event EventHandler Failed
+ {
+ add => _events.AddEventHandler(value);
+ remove => _events.RemoveEventHandler(value);
+ }
+
///
public object Message { get; }
@@ -106,7 +115,7 @@ public string Authorization
/// The message to reply.
public void Response(object message)
{
- _events.HandleEvent(this, new MessageRepliedEventArgs(message), nameof(OnResponse));
+ _events.HandleEvent(this, new MessageRepliedEventArgs(message), nameof(Responded));
}
///
@@ -119,6 +128,15 @@ public void Response(TMessage message)
Response((object)message);
}
+ ///
+ /// Called after the message handling was failed.
+ ///
+ ///
+ public void Failure(Exception exception)
+ {
+ _events.HandleEvent(this, exception, nameof(Failed));
+ }
+
///
/// Called after the message has been handled.
/// This operate will raised up the event.
diff --git a/Source/Euonia.Bus.InMemory/InMemoryDispatcher.cs b/Source/Euonia.Bus.InMemory/InMemoryDispatcher.cs
index b58fc64..28123bb 100644
--- a/Source/Euonia.Bus.InMemory/InMemoryDispatcher.cs
+++ b/Source/Euonia.Bus.InMemory/InMemoryDispatcher.cs
@@ -39,6 +39,11 @@ public async Task SendAsync(RoutedMessage message, Cancellat
cancellationToken.Register(() => taskCompletion.SetCanceled(cancellationToken));
}
+ context.Failed += (_, exception) =>
+ {
+ taskCompletion.TrySetException(exception);
+ };
+
context.Completed += (_, _) =>
{
taskCompletion.SetResult();
@@ -68,13 +73,17 @@ public async Task SendAsync(RoutedMessage taskCompletion.TrySetCanceled(), false);
}
- context.OnResponse += (_, args) =>
+ context.Responded += (_, args) =>
{
taskCompletion.SetResult((TResponse)args.Result);
};
+ context.Failed += (_, exception) =>
+ {
+ taskCompletion.TrySetException(exception);
+ };
context.Completed += (_, _) =>
{
- taskCompletion.TrySetResult(default);
+ taskCompletion.TryCompleteFromCompletedTask(Task.FromResult(default(TResponse)));
};
StrongReferenceMessenger.Default.UnsafeSend(pack, message.Channel);
diff --git a/Source/Euonia.Bus.RabbitMq/Constants.cs b/Source/Euonia.Bus.RabbitMq/Constants.cs
index a1d7b2a..71b75ee 100644
--- a/Source/Euonia.Bus.RabbitMq/Constants.cs
+++ b/Source/Euonia.Bus.RabbitMq/Constants.cs
@@ -8,7 +8,8 @@ internal class Constants
{
ReferenceLoopHandling = ReferenceLoopHandling.Ignore,
ConstructorHandling = ConstructorHandling.Default,
- MetadataPropertyHandling = MetadataPropertyHandling.ReadAhead
+ MetadataPropertyHandling = MetadataPropertyHandling.ReadAhead,
+ TypeNameHandling = TypeNameHandling.Auto
};
public class MessageHeaders
diff --git a/Source/Euonia.Bus.RabbitMq/RabbitMqDispatcher.cs b/Source/Euonia.Bus.RabbitMq/RabbitMqDispatcher.cs
index 2f80372..9694410 100644
--- a/Source/Euonia.Bus.RabbitMq/RabbitMqDispatcher.cs
+++ b/Source/Euonia.Bus.RabbitMq/RabbitMqDispatcher.cs
@@ -48,56 +48,88 @@ public async Task PublishAsync(RoutedMessage message, Cancel
props.Type = typeName;
await Policy.Handle()
- .Or()
- .WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(3), (exception, _, retryCount, _) =>
- {
- _logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
- })
- .ExecuteAsync(async () =>
- {
- var messageBody = await SerializeAsync(message, cancellationToken);
-
- channel.ExchangeDeclare(_options.ExchangeName, _options.ExchangeType);
- channel.BasicPublish(_options.ExchangeName, $"{_options.TopicName}${message.Channel}$", props, messageBody);
-
- Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, null));
- });
+ .Or()
+ .WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(3), (exception, _, retryCount, _) =>
+ {
+ _logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
+ })
+ .ExecuteAsync(async () =>
+ {
+ var messageBody = await SerializeAsync(message, cancellationToken);
+
+ channel.ExchangeDeclare(_options.ExchangeName, _options.ExchangeType);
+ channel.BasicPublish(_options.ExchangeName, $"{_options.TopicName}${message.Channel}$", props, messageBody);
+
+ Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, null));
+ });
}
///
public async Task SendAsync(RoutedMessage message, CancellationToken cancellationToken = default) where TMessage : class
{
- using var channel = _connection.CreateChannel();
+ var task = new TaskCompletionSource();
var requestQueueName = $"{_options.QueueName}${message.Channel}$";
+ using var channel = _connection.CreateChannel();
+
CheckQueue(channel, requestQueueName);
+ var responseQueueName = channel.QueueDeclare().QueueName;
+ var consumer = new EventingBasicConsumer(channel);
+
+ consumer.Received += OnReceived;
+
var typeName = message.GetTypeName();
var props = channel.CreateBasicProperties();
props.Headers ??= new Dictionary();
props.Headers[Constants.MessageHeaders.MessageType] = typeName;
props.Type = typeName;
+ props.CorrelationId = message.CorrelationId;
+ props.ReplyTo = responseQueueName;
await Policy.Handle()
- .Or()
- .WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(3), (exception, _, retryCount, _) =>
- {
- _logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
- })
- .ExecuteAsync(async () =>
- {
- var messageBody = await SerializeAsync(message, cancellationToken);
-
- channel.BasicPublish("", requestQueueName, props, messageBody);
-
- Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, null));
- });
+ .Or()
+ .WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(1), (exception, _, retryCount, _) =>
+ {
+ _logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
+ })
+ .ExecuteAsync(async () =>
+ {
+ var messageBody = await SerializeAsync(message, cancellationToken);
+ channel.BasicPublish("", requestQueueName, props, messageBody);
+ channel.BasicConsume(consumer, responseQueueName, true);
+
+ Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, null));
+ });
+
+ await task.Task;
+ consumer.Received -= OnReceived;
+
+ void OnReceived(object sender, BasicDeliverEventArgs args)
+ {
+ if (args.BasicProperties.CorrelationId != message.CorrelationId)
+ {
+ return;
+ }
+
+ var body = args.Body.ToArray();
+ var response = JsonConvert.DeserializeObject>(Encoding.UTF8.GetString(body), Constants.SerializerSettings);
+ if (response.IsSuccess)
+ {
+ task.SetResult(response.Result);
+ }
+ else
+ {
+ task.SetException(response.Error);
+ }
+ }
}
///
- public async Task SendAsync(RoutedMessage message, CancellationToken cancellationToken = default) where TMessage : class
+ public async Task SendAsync(RoutedMessage message, CancellationToken cancellationToken = default)
+ where TMessage : class
{
var task = new TaskCompletionSource();
@@ -122,19 +154,19 @@ public async Task SendAsync(RoutedMessage()
- .Or()
- .WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(1), (exception, _, retryCount, _) =>
- {
- _logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
- })
- .ExecuteAsync(async () =>
- {
- var messageBody = await SerializeAsync(message, cancellationToken);
- channel.BasicPublish("", requestQueueName, props, messageBody);
- channel.BasicConsume(consumer, responseQueueName, true);
-
- Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, null));
- });
+ .Or()
+ .WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(1), (exception, _, retryCount, _) =>
+ {
+ _logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
+ })
+ .ExecuteAsync(async () =>
+ {
+ var messageBody = await SerializeAsync(message, cancellationToken);
+ channel.BasicPublish("", requestQueueName, props, messageBody);
+ channel.BasicConsume(consumer, responseQueueName, true);
+
+ Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, null));
+ });
var result = await task.Task;
consumer.Received -= OnReceived;
@@ -148,9 +180,8 @@ void OnReceived(object sender, BasicDeliverEventArgs args)
}
var body = args.Body.ToArray();
- var response = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body), Constants.SerializerSettings);
-
- task.SetResult(response);
+ var response = JsonConvert.DeserializeObject>(Encoding.UTF8.GetString(body), Constants.SerializerSettings);
+ task.SetResult(response.Result);
}
}
diff --git a/Source/Euonia.Bus.RabbitMq/RabbitMqQueueConsumer.cs b/Source/Euonia.Bus.RabbitMq/RabbitMqQueueConsumer.cs
index 62da174..a205f69 100644
--- a/Source/Euonia.Bus.RabbitMq/RabbitMqQueueConsumer.cs
+++ b/Source/Euonia.Bus.RabbitMq/RabbitMqQueueConsumer.cs
@@ -40,6 +40,7 @@ internal override void Start(string channel)
{
Connection.TryConnect();
}
+
Channel = Connection.CreateChannel();
Channel.QueueDeclare(queueName, true, false, false, null);
@@ -65,27 +66,40 @@ protected override async void HandleMessageReceived(object sender, BasicDeliverE
OnMessageReceived(new MessageReceivedEventArgs(message.Data, context));
var taskCompletion = new TaskCompletionSource