Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove base message constraint #1150

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using JustSaying.Messaging.Middleware;
using JustSaying.Models;
using JustSaying.Sample.Middleware.Messages;
using Serilog;

Expand All @@ -17,7 +18,8 @@ protected override async Task<bool> RunInnerAsync(HandleMessageContext context,
}
else if (context.Message is UnreliableMessage unreliableMessage)
{
Log.Information("[{MiddlewareName}] Hello UnreliableMessage! Hope you work this time....your Name is {Name} and Id is {Id}", nameof(InterrogateMiddleware),unreliableMessage.Name, context.Message.Id);
var messageId = (context.Message as Message)?.Id.ToString() ?? "<unknown>";
Log.Information("[{MiddlewareName}] Hello UnreliableMessage! Hope you work this time....your Name is {Name} and Id is {Id}", nameof(InterrogateMiddleware),unreliableMessage.Name, messageId);
}

return await func(stoppingToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using JustSaying.Messaging.Middleware.Logging;
using JustSaying.Messaging.Middleware.PostProcessing;
using JustSaying.Messaging.Monitoring;
using JustSaying.Models;
using JustSaying.Naming;
using Microsoft.Extensions.DependencyInjection.Extensions;

Expand Down Expand Up @@ -116,7 +115,7 @@ public static IServiceCollection AddJustSaying(this IServiceCollection services,
{
throw new ArgumentNullException(nameof(configure));
}

// Register as self so the same singleton instance implements two different interfaces
services.TryAddSingleton((p) => new ServiceProviderResolver(p));
services.TryAddSingleton<IHandlerResolver>((p) => p.GetRequiredService<ServiceProviderResolver>());
Expand Down Expand Up @@ -197,7 +196,7 @@ public static IServiceCollection AddJustSaying(this IServiceCollection services,
/// <paramref name="services"/> is <see langword="null"/>.
/// </exception>
public static IServiceCollection AddJustSayingHandler<TMessage, THandler>(this IServiceCollection services)
where TMessage : Message
where TMessage : class
where THandler : class, IHandlerAsync<TMessage>
{
if (services == null)
Expand Down Expand Up @@ -227,7 +226,7 @@ public static IServiceCollection AddJustSayingHandler<TMessage, THandler>(this I
public static IServiceCollection AddJustSayingHandlers<TMessage>(
this IServiceCollection services,
IEnumerable<IHandlerAsync<TMessage>> handlers)
where TMessage : Message
where TMessage : class
{
if (services == null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.ComponentModel;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Models;

namespace StructureMap;

Expand All @@ -20,7 +19,7 @@ public static class RegistryExtensions
/// <paramref name="registry"/> is <see langword="null"/>.
/// </exception>
public static void AddJustSayingHandler<TMessage, THandler>(this Registry registry)
where TMessage : Message
where TMessage : class
where THandler : class, IHandlerAsync<TMessage>
{
if (registry == null)
Expand All @@ -30,4 +29,4 @@ public static void AddJustSayingHandler<TMessage, THandler>(this Registry regist

registry.For<IHandlerAsync<TMessage>>().Use<THandler>();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Messaging.Middleware;
using JustSaying.Messaging.Monitoring;
using JustSaying.Models;
using Microsoft.Extensions.Logging;

namespace JustSaying.AwsTools.MessageHandling.Dispatch;
Expand Down Expand Up @@ -37,29 +36,29 @@ public async Task DispatchMessageAsync(
return;
}

(bool success, Message typedMessage, MessageAttributes attributes) =
(bool success, object messageInstance, MessageAttributes attributes) =
await DeserializeMessage(messageContext, cancellationToken).ConfigureAwait(false);

if (!success)
{
return;
}

var messageType = typedMessage.GetType();
var messageType = messageInstance.GetType();
var middleware = _middlewareMap.Get(messageContext.QueueName, messageType);

if (middleware == null)
{
_logger.LogError(
"Failed to dispatch. Middleware for message of type '{MessageTypeName}' not found in middleware map.",
typedMessage.GetType().FullName);
messageInstance.GetType().FullName);
return;
}

var handleContext = new HandleMessageContext(
messageContext.QueueName,
messageContext.Message,
typedMessage,
messageInstance,
messageType,
messageContext,
messageContext,
Expand All @@ -71,8 +70,7 @@ await middleware.RunAsync(handleContext, null, cancellationToken)

}

private async Task<(bool success, Message typedMessage, MessageAttributes attributes)>
DeserializeMessage(IQueueMessageContext messageContext, CancellationToken cancellationToken)
private async Task<(bool success, object messageInstance, MessageAttributes attributes)> DeserializeMessage(IQueueMessageContext messageContext, CancellationToken cancellationToken)
{
try
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using JustSaying.Messaging.Interrogation;
using JustSaying.Models;
using HandleMessageMiddleware = JustSaying.Messaging.Middleware.MiddlewareBase<JustSaying.Messaging.Middleware.HandleMessageContext, bool>;

namespace JustSaying.AwsTools.MessageHandling.Dispatch;
Expand Down Expand Up @@ -49,15 +48,15 @@ public IEnumerable<Type> Types
/// Adds a middleware chain to be executed when a message arrives in a queue.
/// If the middleware is already registered for a queue, it will not be added again.
/// </summary>
/// <typeparam name="T">The type of the message to handle on this queue.</typeparam>
/// <typeparam name="TMessage">The type of the message to handle on this queue.</typeparam>
/// <param name="queueName">The queue to register the middleware for.</param>
/// <param name="middleware">The factory function to create middleware with.</param>
public MiddlewareMap Add<T>(string queueName, HandleMessageMiddleware middleware) where T : Message
public MiddlewareMap Add<TMessage>(string queueName, HandleMessageMiddleware middleware) where TMessage : class
{
if (queueName is null) throw new ArgumentNullException(nameof(queueName));
if (middleware is null) throw new ArgumentNullException(nameof(middleware));

_middlewares[(queueName, typeof(T))] = middleware;
_middlewares[(queueName, typeof(TMessage))] = middleware;

return this;
}
Expand Down Expand Up @@ -91,4 +90,4 @@ public InterrogationResult Interrogate()
Middlewares = middlewares
});
}
}
}
20 changes: 10 additions & 10 deletions src/JustSaying/AwsTools/MessageHandling/SnsMessagePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@

namespace JustSaying.AwsTools.MessageHandling;

public class SnsMessagePublisher : IMessagePublisher, IInterrogable
public class SnsMessagePublisher<TMessage> : IMessagePublisher<TMessage>, IInterrogable where TMessage : class
{
private readonly IMessageSerializationRegister _serializationRegister;
private readonly IMessageSubjectProvider _messageSubjectProvider;
private readonly Func<Exception, Message, bool> _handleException;
public Action<MessageResponse, Message> MessageResponseLogger { get; set; }
private readonly Func<Exception, TMessage, bool> _handleException;
public Action<MessageResponse, TMessage> MessageResponseLogger { get; set; }
public string Arn { get; internal set; }
protected IAmazonSimpleNotificationService Client { get; }
private readonly ILogger _logger;
Expand All @@ -26,7 +26,7 @@ public SnsMessagePublisher(
IMessageSerializationRegister serializationRegister,
ILoggerFactory loggerFactory,
IMessageSubjectProvider messageSubjectProvider,
Func<Exception, Message, bool> handleException = null)
Func<Exception, TMessage, bool> handleException = null)
: this(client, serializationRegister, loggerFactory, messageSubjectProvider, handleException)
{
Arn = topicArn;
Expand All @@ -37,7 +37,7 @@ public SnsMessagePublisher(
IMessageSerializationRegister serializationRegister,
ILoggerFactory loggerFactory,
IMessageSubjectProvider messageSubjectProvider,
Func<Exception, Message, bool> handleException = null)
Func<Exception, TMessage, bool> handleException = null)
{
Client = client;
_serializationRegister = serializationRegister;
Expand All @@ -51,10 +51,10 @@ public Task StartAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

public Task PublishAsync(Message message, CancellationToken cancellationToken)
public Task PublishAsync(TMessage message, CancellationToken cancellationToken)
=> PublishAsync(message, null, cancellationToken);

public async Task PublishAsync(Message message, PublishMetadata metadata, CancellationToken cancellationToken)
public async Task PublishAsync(TMessage message, PublishMetadata metadata, CancellationToken cancellationToken)
{
var request = BuildPublishRequest(message, metadata);
PublishResponse response = null;
Expand All @@ -79,7 +79,7 @@ public async Task PublishAsync(Message message, PublishMetadata metadata, Cancel
{
_logger.LogInformation(
"Published message {MessageId} of type {MessageType} to {DestinationType} '{MessageDestination}'.",
message.Id,
(message as Message)?.Id.ToString() ?? "<unknown>",
message.GetType().FullName,
"Topic",
request.TopicArn);
Expand All @@ -97,9 +97,9 @@ public async Task PublishAsync(Message message, PublishMetadata metadata, Cancel
}
}

private bool ClientExceptionHandler(Exception ex, Message message) => _handleException?.Invoke(ex, message) ?? false;
private bool ClientExceptionHandler(Exception ex, TMessage message) => _handleException?.Invoke(ex, message) ?? false;

private PublishRequest BuildPublishRequest(Message message, PublishMetadata metadata)
private PublishRequest BuildPublishRequest(TMessage message, PublishMetadata metadata)
{
var messageToSend = _serializationRegister.Serialize(message, serializeForSnsPublishing: true);
var messageType = _messageSubjectProvider.GetSubjectForType(message.GetType());
Expand Down
14 changes: 7 additions & 7 deletions src/JustSaying/AwsTools/MessageHandling/SqsMessagePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@

namespace JustSaying.AwsTools.MessageHandling;

public class SqsMessagePublisher : IMessagePublisher
public class SqsMessagePublisher<TMessage> : IMessagePublisher<TMessage> where TMessage : class
{
private readonly IAmazonSQS _client;
private readonly IMessageSerializationRegister _serializationRegister;
private readonly ILogger _logger;
public Action<MessageResponse, Message> MessageResponseLogger { get; set; }
public Action<MessageResponse, TMessage> MessageResponseLogger { get; set; }

public Uri QueueUrl { get; internal set; }

Expand Down Expand Up @@ -42,10 +42,10 @@ public Task StartAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

public async Task PublishAsync(Message message, CancellationToken cancellationToken)
public async Task PublishAsync(TMessage message, CancellationToken cancellationToken)
=> await PublishAsync(message, null, cancellationToken).ConfigureAwait(false);

public async Task PublishAsync(Message message, PublishMetadata metadata, CancellationToken cancellationToken)
public async Task PublishAsync(TMessage message, PublishMetadata metadata, CancellationToken cancellationToken)
{
if (QueueUrl is null) throw new PublishException("Queue URL was null, perhaps you need to call `StartAsync` on the `IMessagePublisher` before publishing.");

Expand All @@ -69,7 +69,7 @@ public async Task PublishAsync(Message message, PublishMetadata metadata, Cancel
{
_logger.LogInformation(
"Published message {MessageId} of type {MessageType} to {DestinationType} '{MessageDestination}'.",
message.Id,
(message as Message)?.Id.ToString() ?? "<unknown>",
message.GetType().FullName,
"Queue",
request.QueueUrl);
Expand All @@ -87,7 +87,7 @@ public async Task PublishAsync(Message message, PublishMetadata metadata, Cancel
}
}

private SendMessageRequest BuildSendMessageRequest(Message message, PublishMetadata metadata)
private SendMessageRequest BuildSendMessageRequest(TMessage message, PublishMetadata metadata)
{
var request = new SendMessageRequest
{
Expand All @@ -103,7 +103,7 @@ private SendMessageRequest BuildSendMessageRequest(Message message, PublishMetad
return request;
}

public string GetMessageInContext(Message message) => _serializationRegister.Serialize(message, serializeForSnsPublishing: false);
public string GetMessageInContext(TMessage message) => _serializationRegister.Serialize(message, serializeForSnsPublishing: false);

public InterrogationResult Interrogate()
{
Expand Down
8 changes: 2 additions & 6 deletions src/JustSaying/Fluent/IPublicationBuilder`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ namespace JustSaying.Fluent;
/// <summary>
/// Defines a builder for a publication.
/// </summary>
/// <typeparam name="T">
/// The type of the messages to publish.
/// </typeparam>
internal interface IPublicationBuilder<out T>
where T : Message
internal interface IPublicationBuilder
{
/// <summary>
/// Configures the publication for the <see cref="JustSayingBus"/>.
Expand All @@ -20,4 +16,4 @@ internal interface IPublicationBuilder<out T>
/// <param name="proxy">The <see cref="IAwsClientFactoryProxy"/> to use to create SQS/SNS clients with.</param>
/// <param name="loggerFactory">The <see cref="ILoggerFactory"/> logger factory to use.</param>
void Configure(JustSayingBus bus, IAwsClientFactoryProxy proxy, ILoggerFactory loggerFactory);
}
}
8 changes: 4 additions & 4 deletions src/JustSaying/Fluent/ISubscriptionBuilder`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ namespace JustSaying.Fluent;
/// <summary>
/// Defines a builder for a subscription.
/// </summary>
/// <typeparam name="T">
/// <typeparam name="TMessage">
/// The type of the messages to subscribe to.
/// </typeparam>
public interface ISubscriptionBuilder<out T>
where T : Message
public interface ISubscriptionBuilder<out TMessage>
where TMessage : class
{
/// <summary>
/// Configures the middleware pipeline for this subscription.
Expand Down Expand Up @@ -53,7 +53,7 @@ public interface ISubscriptionBuilder<out T>
/// </ul>
/// </example>
/// <returns>The current <see cref="SqsReadConfigurationBuilder"/>.</returns>
public ISubscriptionBuilder<T> WithMiddlewareConfiguration(Action<HandlerMiddlewareBuilder> middlewareConfiguration);
public ISubscriptionBuilder<TMessage> WithMiddlewareConfiguration(Action<HandlerMiddlewareBuilder> middlewareConfiguration);


/// <summary>
Expand Down
6 changes: 3 additions & 3 deletions src/JustSaying/Fluent/MessagingConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ internal MessagingConfigurationBuilder(MessagingBusBuilder busBuilder)
/// <summary>
/// Gets or sets the optional value to use for <see cref="IPublishConfiguration.MessageResponseLogger"/>
/// </summary>
private Action<MessageResponse, Message> MessageResponseLogger { get; set; }
private Action<MessageResponse, object> MessageResponseLogger { get; set; }

/// <summary>
/// Gets or sets the optional value to use for <see cref="IPublishConfiguration.PublishFailureBackoff"/>
Expand Down Expand Up @@ -133,7 +133,7 @@ public MessagingConfigurationBuilder WithAdditionalSubscriberAccount(string acco
/// <exception cref="ArgumentNullException">
/// <paramref name="logger"/> is <see langword="null"/>.
/// </exception>
public MessagingConfigurationBuilder WithMessageResponseLogger(Action<MessageResponse, Message> logger)
public MessagingConfigurationBuilder WithMessageResponseLogger(Action<MessageResponse, object> logger)
{
MessageResponseLogger = logger ?? throw new ArgumentNullException(nameof(logger));
return this;
Expand Down Expand Up @@ -334,4 +334,4 @@ public IMessagingConfig Build()

return config;
}
}
}
Loading