diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/InProcessFunctionEndpoint.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/InProcessFunctionEndpoint.cs index 3b403e22..c3a032ba 100644 --- a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/InProcessFunctionEndpoint.cs +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/InProcessFunctionEndpoint.cs @@ -3,15 +3,11 @@ using System; using System.Threading; using System.Threading.Tasks; - using System.Transactions; using Azure.Messaging.ServiceBus; using AzureFunctions.InProcess.ServiceBus; using AzureFunctions.InProcess.ServiceBus.Serverless; - using Extensibility; using Microsoft.Azure.WebJobs.ServiceBus; using Microsoft.Extensions.Logging; - using Transport; - using Transport.AzureServiceBus; using ExecutionContext = Microsoft.Azure.WebJobs.ExecutionContext; class InProcessFunctionEndpoint : IFunctionEndpoint @@ -25,6 +21,44 @@ public InProcessFunctionEndpoint( endpointFactory = () => externallyManagedContainerEndpoint.Start(serviceProvider); } + public async Task ProcessAtomic( + ServiceBusReceivedMessage message, + ExecutionContext executionContext, + ServiceBusClient serviceBusClient, + ServiceBusMessageActions messageActions, + ILogger functionsLogger = null, + CancellationToken cancellationToken = default) + { + FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger); + + try + { + await InitializeEndpointIfNecessary(cancellationToken).ConfigureAwait(false); + } + catch (Exception) + { + await messageActions.AbandonMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false); + throw; + } + + await messageProcessor.ProcessAtomic(message, serviceBusClient, messageActions, cancellationToken) + .ConfigureAwait(false); + } + + public async Task ProcessNonAtomic( + ServiceBusReceivedMessage message, + ExecutionContext executionContext, + ILogger functionsLogger = null, + CancellationToken cancellationToken = default) + { + FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger); + + await InitializeEndpointIfNecessary(cancellationToken).ConfigureAwait(false); + + await messageProcessor.ProcessNonAtomic(message, cancellationToken) + .ConfigureAwait(false); + } + public async Task Send(object message, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null, CancellationToken cancellationToken = default) { FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger); @@ -103,137 +137,6 @@ public Task Unsubscribe(Type eventType, ExecutionContext executionContext, ILogg return Unsubscribe(eventType, new UnsubscribeOptions(), executionContext, functionsLogger, cancellationToken); } - public async Task ProcessNonAtomic( - ServiceBusReceivedMessage message, - ExecutionContext executionContext, - ILogger functionsLogger = null, - CancellationToken cancellationToken = default) - { - FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger); - - await InitializeEndpointIfNecessary(cancellationToken) - .ConfigureAwait(false); - - try - { - var messageContext = CreateMessageContext(message, new TransportTransaction(), false); - - await pipeline.PushMessage(messageContext, cancellationToken).ConfigureAwait(false); - - } - catch (Exception exception) - { - var errorContext = CreateErrorContext(message, new TransportTransaction(), exception); - - var errorHandleResult = await pipeline.PushFailedMessage(errorContext, cancellationToken).ConfigureAwait(false); - - if (errorHandleResult == ErrorHandleResult.Handled) - { - return; - } - throw; - } - } - - public async Task ProcessAtomic( - ServiceBusReceivedMessage message, - ExecutionContext executionContext, - ServiceBusClient serviceBusClient, - ServiceBusMessageActions messageActions, - ILogger functionsLogger = null, - CancellationToken cancellationToken = default) - { - FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger); - - try - { - await InitializeEndpointIfNecessary(cancellationToken).ConfigureAwait(false); - } - catch (Exception) - { - await messageActions.AbandonMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false); - throw; - } - - try - { - using (var azureServiceBusTransaction = CreateTransaction(message.PartitionKey, serviceBusClient)) - { - var messageContext = CreateMessageContext(message, azureServiceBusTransaction.TransportTransaction, true); - - await pipeline.PushMessage(messageContext, cancellationToken).ConfigureAwait(false); - - await SafeCompleteMessageAsync(messageActions, message, azureServiceBusTransaction, cancellationToken).ConfigureAwait(false); - azureServiceBusTransaction.Commit(); - } - } - catch (Exception exception) - { - ErrorHandleResult result; - using (var azureServiceBusTransaction = CreateTransaction(message.PartitionKey, serviceBusClient)) - { - var errorContext = CreateErrorContext(message, azureServiceBusTransaction.TransportTransaction, exception); - - result = await pipeline.PushFailedMessage(errorContext, cancellationToken).ConfigureAwait(false); - - if (result == ErrorHandleResult.Handled) - { - await SafeCompleteMessageAsync(messageActions, message, azureServiceBusTransaction, cancellationToken).ConfigureAwait(false); - } - - azureServiceBusTransaction.Commit(); - } - - if (result != ErrorHandleResult.Handled) - { - await messageActions.AbandonMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false); - } - } - } - - ErrorContext CreateErrorContext(ServiceBusReceivedMessage message, TransportTransaction transportTransaction, Exception exception) - { - var errorContext = new ErrorContext( - exception, - message.GetHeaders(), - message.MessageId, - message.Body, - transportTransaction, - message.DeliveryCount, - pipeline.ReceiveAddress, - new ContextBag()); - return errorContext; - } - - MessageContext CreateMessageContext(ServiceBusReceivedMessage message, TransportTransaction transportTransaction, bool atomic) - { - var contextBag = new ContextBag(); - var invocationMode = new FunctionInvocationMode(atomic); - contextBag.Set(invocationMode); - var messageContext = new MessageContext( - message.MessageId, - message.GetHeaders(), - message.Body, - transportTransaction, - pipeline.ReceiveAddress, - contextBag); - return messageContext; - } - - static async Task SafeCompleteMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, AzureServiceBusTransportTransaction azureServiceBusTransaction, CancellationToken cancellationToken = default) - { - using var scope = azureServiceBusTransaction.ToTransactionScope(); - await messageActions.CompleteMessageAsync(message, cancellationToken).ConfigureAwait(false); - scope.Complete(); - } - - static AzureServiceBusTransportTransaction CreateTransaction(string messagePartitionKey, ServiceBusClient serviceBusClient) => - new(serviceBusClient, messagePartitionKey, new TransactionOptions - { - IsolationLevel = IsolationLevel.Serializable, - Timeout = TransactionManager.MaximumTimeout - }); - internal static readonly string[] AssembliesToExcludeFromScanning = { "NCrontab.Signed.dll", "Azure.Core.dll", @@ -253,16 +156,16 @@ static AzureServiceBusTransportTransaction CreateTransaction(string messageParti internal async Task InitializeEndpointIfNecessary(CancellationToken cancellationToken) { - if (pipeline == null) + if (messageProcessor == null) { await semaphoreLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { - if (pipeline == null) + if (messageProcessor == null) { endpoint = await endpointFactory().ConfigureAwait(false); - pipeline = serverlessInterceptor.PipelineInvoker; + messageProcessor = serverlessInterceptor.MessageProcessor; } } finally @@ -272,7 +175,7 @@ internal async Task InitializeEndpointIfNecessary(CancellationToken cancellation } } - PipelineInvoker pipeline; + IMessageProcessor messageProcessor; IEndpointInstance endpoint; readonly Func> endpointFactory; diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/ServerlessInterceptor.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/ServerlessInterceptor.cs index 4ed034ea..66c84a94 100644 --- a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/ServerlessInterceptor.cs +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/ServerlessInterceptor.cs @@ -6,6 +6,6 @@ class ServerlessInterceptor public ServerlessInterceptor(ServerlessTransport transport) => this.transport = transport; - public PipelineInvoker PipelineInvoker => transport.PipelineInvoker; + public IMessageProcessor MessageProcessor => transport.MessageProcessor; } } diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/IMessageProcessor.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/IMessageProcessor.cs new file mode 100644 index 00000000..d2396645 --- /dev/null +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/IMessageProcessor.cs @@ -0,0 +1,20 @@ +namespace NServiceBus.AzureFunctions.InProcess.ServiceBus +{ + using System.Threading; + using System.Threading.Tasks; + using Azure.Messaging.ServiceBus; + using Microsoft.Azure.WebJobs.ServiceBus; + + interface IMessageProcessor + { + Task ProcessNonAtomic( + ServiceBusReceivedMessage message, + CancellationToken cancellationToken = default); + + Task ProcessAtomic( + ServiceBusReceivedMessage message, + ServiceBusClient serviceBusClient, + ServiceBusMessageActions messageActions, + CancellationToken cancellationToken = default); + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/PipelineInvoker.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/PipelineInvoker.cs deleted file mode 100644 index 95e5b688..00000000 --- a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/PipelineInvoker.cs +++ /dev/null @@ -1,44 +0,0 @@ -namespace NServiceBus.AzureFunctions.InProcess.ServiceBus -{ - using System.Threading; - using System.Threading.Tasks; - using Transport; - - class PipelineInvoker : IMessageReceiver - { - public PipelineInvoker(IMessageReceiver baseTransportReceiver) - { - this.baseTransportReceiver = baseTransportReceiver; - } - - public Task PushFailedMessage(ErrorContext errorContext, CancellationToken cancellationToken) => onError(errorContext, cancellationToken); - - public Task PushMessage(MessageContext messageContext, CancellationToken cancellationToken) => onMessage.Invoke(messageContext, cancellationToken); - - public Task Initialize(PushRuntimeSettings limitations, OnMessage onMessage, OnError onError, - CancellationToken cancellationToken) - { - this.onMessage = onMessage; - this.onError = onError; - return baseTransportReceiver?.Initialize(limitations, - (_, __) => Task.CompletedTask, - (_, __) => Task.FromResult(ErrorHandleResult.Handled), - cancellationToken) ?? Task.CompletedTask; - } - - public Task StartReceive(CancellationToken cancellationToken) => Task.CompletedTask; - - // No-op because the rate at which Azure Functions pushes messages to the pipeline can't be controlled. - public Task ChangeConcurrency(PushRuntimeSettings limitations, CancellationToken cancellationToken = new CancellationToken()) => Task.CompletedTask; - - public Task StopReceive(CancellationToken cancellationToken) => Task.CompletedTask; - public ISubscriptionManager Subscriptions => baseTransportReceiver.Subscriptions; - public string Id => baseTransportReceiver.Id; - - public string ReceiveAddress => baseTransportReceiver.ReceiveAddress; - - readonly IMessageReceiver baseTransportReceiver; - OnMessage onMessage; - OnError onError; - } -} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/PipelineInvokingMessageProcessor.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/PipelineInvokingMessageProcessor.cs new file mode 100644 index 00000000..a34138f7 --- /dev/null +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/PipelineInvokingMessageProcessor.cs @@ -0,0 +1,157 @@ +namespace NServiceBus.AzureFunctions.InProcess.ServiceBus +{ + using System; + using System.Threading; + using System.Threading.Tasks; + using System.Transactions; + using Azure.Messaging.ServiceBus; + using Microsoft.Azure.WebJobs.ServiceBus; + using NServiceBus.AzureFunctions.InProcess.ServiceBus.Serverless; + using NServiceBus.Extensibility; + using NServiceBus.Transport.AzureServiceBus; + using Transport; + + class PipelineInvokingMessageProcessor : IMessageReceiver, IMessageProcessor + { + public PipelineInvokingMessageProcessor(IMessageReceiver baseTransportReceiver) + { + this.baseTransportReceiver = baseTransportReceiver; + } + + public Task Initialize(PushRuntimeSettings limitations, OnMessage onMessage, OnError onError, + CancellationToken cancellationToken) + { + this.onMessage = onMessage; + this.onError = onError; + return baseTransportReceiver?.Initialize(limitations, + (_, __) => Task.CompletedTask, + (_, __) => Task.FromResult(ErrorHandleResult.Handled), + cancellationToken) ?? Task.CompletedTask; + } + + public async Task ProcessNonAtomic( + ServiceBusReceivedMessage message, + CancellationToken cancellationToken = default) + { + try + { + var messageContext = CreateMessageContext(message, new TransportTransaction(), false); + + await onMessage(messageContext, cancellationToken).ConfigureAwait(false); + + } + catch (Exception exception) + { + var errorContext = CreateErrorContext(message, new TransportTransaction(), exception); + + var errorHandleResult = await onError(errorContext, cancellationToken).ConfigureAwait(false); + + if (errorHandleResult == ErrorHandleResult.Handled) + { + return; + } + throw; + } + } + + public async Task ProcessAtomic( + ServiceBusReceivedMessage message, + ServiceBusClient serviceBusClient, + ServiceBusMessageActions messageActions, + CancellationToken cancellationToken = default) + { + try + { + using (var azureServiceBusTransaction = CreateTransaction(message.PartitionKey, serviceBusClient)) + { + var messageContext = CreateMessageContext(message, azureServiceBusTransaction.TransportTransaction, true); + + await onMessage(messageContext, cancellationToken).ConfigureAwait(false); + + await SafeCompleteMessageAsync(messageActions, message, azureServiceBusTransaction, cancellationToken).ConfigureAwait(false); + azureServiceBusTransaction.Commit(); + } + } + catch (Exception exception) + { + ErrorHandleResult result; + using (var azureServiceBusTransaction = CreateTransaction(message.PartitionKey, serviceBusClient)) + { + var errorContext = CreateErrorContext(message, azureServiceBusTransaction.TransportTransaction, exception); + + result = await onError(errorContext, cancellationToken).ConfigureAwait(false); + + if (result == ErrorHandleResult.Handled) + { + await SafeCompleteMessageAsync(messageActions, message, azureServiceBusTransaction, cancellationToken).ConfigureAwait(false); + } + + azureServiceBusTransaction.Commit(); + } + + if (result != ErrorHandleResult.Handled) + { + await messageActions.AbandonMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false); + } + } + } + + ErrorContext CreateErrorContext(ServiceBusReceivedMessage message, TransportTransaction transportTransaction, Exception exception) + { + var errorContext = new ErrorContext( + exception, + message.GetHeaders(), + message.MessageId, + message.Body, + transportTransaction, + message.DeliveryCount, + ReceiveAddress, + new ContextBag()); + return errorContext; + } + + MessageContext CreateMessageContext(ServiceBusReceivedMessage message, TransportTransaction transportTransaction, bool atomic) + { + var contextBag = new ContextBag(); + var invocationMode = new FunctionInvocationMode(atomic); + contextBag.Set(invocationMode); + var messageContext = new MessageContext( + message.MessageId, + message.GetHeaders(), + message.Body, + transportTransaction, + ReceiveAddress, + contextBag); + return messageContext; + } + + static async Task SafeCompleteMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, AzureServiceBusTransportTransaction azureServiceBusTransaction, CancellationToken cancellationToken = default) + { + using var scope = azureServiceBusTransaction.ToTransactionScope(); + await messageActions.CompleteMessageAsync(message, cancellationToken).ConfigureAwait(false); + scope.Complete(); + } + + static AzureServiceBusTransportTransaction CreateTransaction(string messagePartitionKey, ServiceBusClient serviceBusClient) => + new(serviceBusClient, messagePartitionKey, new TransactionOptions + { + IsolationLevel = IsolationLevel.Serializable, + Timeout = TransactionManager.MaximumTimeout + }); + + public Task StartReceive(CancellationToken cancellationToken) => Task.CompletedTask; + + // No-op because the rate at which Azure Functions pushes messages to the pipeline can't be controlled. + public Task ChangeConcurrency(PushRuntimeSettings limitations, CancellationToken cancellationToken = new CancellationToken()) => Task.CompletedTask; + + public Task StopReceive(CancellationToken cancellationToken) => Task.CompletedTask; + public ISubscriptionManager Subscriptions => baseTransportReceiver.Subscriptions; + public string Id => baseTransportReceiver.Id; + + public string ReceiveAddress => baseTransportReceiver.ReceiveAddress; + + readonly IMessageReceiver baseTransportReceiver; + OnMessage onMessage; + OnError onError; + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/SendOnlyMessageProcessor.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/SendOnlyMessageProcessor.cs new file mode 100644 index 00000000..654b4e25 --- /dev/null +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/SendOnlyMessageProcessor.cs @@ -0,0 +1,27 @@ +namespace NServiceBus.AzureFunctions.InProcess.ServiceBus +{ + using System; + using System.Threading; + using System.Threading.Tasks; + using Azure.Messaging.ServiceBus; + using Microsoft.Azure.WebJobs.ServiceBus; + + class SendOnlyMessageProcessor : IMessageProcessor + { + public Task ProcessNonAtomic( + ServiceBusReceivedMessage message, + CancellationToken cancellationToken = default) + => throw new InvalidOperationException( + $"This endpoint cannot process messages because it is configured in send-only mode. Remove the '{nameof(EndpointConfiguration)}.{nameof(EndpointConfiguration.SendOnly)}' configuration.'" + ); + + public Task ProcessAtomic( + ServiceBusReceivedMessage message, + ServiceBusClient serviceBusClient, + ServiceBusMessageActions messageActions, + CancellationToken cancellationToken = default) + => throw new InvalidOperationException( + $"This endpoint cannot process messages because it is configured in send-only mode. Remove the '{nameof(EndpointConfiguration)}.{nameof(EndpointConfiguration.SendOnly)}' configuration.'" + ); + } +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/SendOnlyReceiver.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/SendOnlyReceiver.cs deleted file mode 100644 index 443b0c8b..00000000 --- a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/SendOnlyReceiver.cs +++ /dev/null @@ -1,26 +0,0 @@ -namespace NServiceBus.AzureFunctions.InProcess.ServiceBus -{ - using System; - using System.Threading; - using System.Threading.Tasks; - using Transport; - - class SendOnlyReceiver : IMessageReceiver - { - static readonly InvalidOperationException SendOnlyEndpointException = new($"This endpoint cannot process messages because it is configured in send-only mode. Remove the '{nameof(EndpointConfiguration)}.{nameof(EndpointConfiguration.SendOnly)}' configuration or do not call '{nameof(IFunctionEndpoint)}.{nameof(IFunctionEndpoint.ProcessAtomic)}/{nameof(IFunctionEndpoint.ProcessNonAtomic)}'"); - - public Task Initialize(PushRuntimeSettings limitations, OnMessage onMessage, OnError onError, - CancellationToken cancellationToken = new CancellationToken()) => - throw SendOnlyEndpointException; - - public Task StartReceive(CancellationToken cancellationToken = new CancellationToken()) => throw SendOnlyEndpointException; - - public Task ChangeConcurrency(PushRuntimeSettings limitations, CancellationToken cancellationToken = new CancellationToken()) => throw SendOnlyEndpointException; - - public Task StopReceive(CancellationToken cancellationToken = new CancellationToken()) => throw SendOnlyEndpointException; - - public ISubscriptionManager Subscriptions => throw SendOnlyEndpointException; - public string Id => throw SendOnlyEndpointException; - public string ReceiveAddress => throw SendOnlyEndpointException; - } -} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs index 6b339015..32391da4 100644 --- a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs @@ -11,6 +11,8 @@ class ServerlessTransport : TransportDefinition const string MainReceiverId = "Main"; const string SendOnlyConfigKey = "Endpoint.SendOnly"; + public IMessageProcessor MessageProcessor { get; private set; } + public ServerlessTransport(AzureServiceBusTransport baseTransport) : base( baseTransport.TransportTransactionMode, baseTransport.SupportsDelayedDelivery, @@ -35,15 +37,13 @@ public override async Task Initialize(HostSettings host var isSendOnly = hostSettings.CoreSettings.GetOrDefault(SendOnlyConfigKey); - PipelineInvoker = isSendOnly - ? new PipelineInvoker(new SendOnlyReceiver()) // send-only endpoint - : (PipelineInvoker)serverlessTransportInfrastructure.Receivers[MainReceiverId]; + MessageProcessor = isSendOnly + ? new SendOnlyMessageProcessor() // send-only endpoint + : (IMessageProcessor)serverlessTransportInfrastructure.Receivers[MainReceiverId]; return serverlessTransportInfrastructure; } - public PipelineInvoker PipelineInvoker { get; private set; } - #pragma warning disable CS0672 // Member overrides obsolete member #pragma warning disable CS0618 // Type or member is obsolete public override string ToTransportAddress(QueueAddress address) => baseTransport.ToTransportAddress(address); diff --git a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs index 9bf548b9..d2c6fdb4 100644 --- a/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs +++ b/src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/ServerlessTransportInfrastructure.cs @@ -15,7 +15,7 @@ public ServerlessTransportInfrastructure(TransportInfrastructure baseTransportIn Dispatcher = baseTransportInfrastructure.Dispatcher; Receivers = baseTransportInfrastructure.Receivers.ToDictionary( r => r.Key, - r => (IMessageReceiver)new PipelineInvoker(r.Value) + r => (IMessageReceiver)new PipelineInvokingMessageProcessor(r.Value) ); }