Skip to content

Commit

Permalink
refactoring message processing out of function endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
jpalac committed Sep 6, 2023
1 parent 7ff5e43 commit fc77d19
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Azure.Messaging.ServiceBus;
using AzureFunctions.InProcess.ServiceBus;
using AzureFunctions.InProcess.ServiceBus.Serverless;
using Extensibility;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Extensions.Logging;
using Transport;
using Transport.AzureServiceBus;
using ExecutionContext = Microsoft.Azure.WebJobs.ExecutionContext;

class InProcessFunctionEndpoint : IFunctionEndpoint
Expand All @@ -25,6 +21,44 @@ public InProcessFunctionEndpoint(
endpointFactory = () => externallyManagedContainerEndpoint.Start(serviceProvider);
}

public async Task ProcessAtomic(
ServiceBusReceivedMessage message,
ExecutionContext executionContext,
ServiceBusClient serviceBusClient,
ServiceBusMessageActions messageActions,
ILogger functionsLogger = null,
CancellationToken cancellationToken = default)
{
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

try
{
await InitializeEndpointIfNecessary(cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
await messageActions.AbandonMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false);
throw;
}

await messageProcessor.ProcessAtomic(message, serviceBusClient, messageActions, cancellationToken)
.ConfigureAwait(false);
}

public async Task ProcessNonAtomic(
ServiceBusReceivedMessage message,
ExecutionContext executionContext,
ILogger functionsLogger = null,
CancellationToken cancellationToken = default)
{
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

await InitializeEndpointIfNecessary(cancellationToken).ConfigureAwait(false);

await messageProcessor.ProcessNonAtomic(message, cancellationToken)
.ConfigureAwait(false);
}

public async Task Send(object message, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null, CancellationToken cancellationToken = default)
{
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);
Expand Down Expand Up @@ -103,137 +137,6 @@ public Task Unsubscribe(Type eventType, ExecutionContext executionContext, ILogg
return Unsubscribe(eventType, new UnsubscribeOptions(), executionContext, functionsLogger, cancellationToken);
}

public async Task ProcessNonAtomic(
ServiceBusReceivedMessage message,
ExecutionContext executionContext,
ILogger functionsLogger = null,
CancellationToken cancellationToken = default)
{
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

await InitializeEndpointIfNecessary(cancellationToken)
.ConfigureAwait(false);

try
{
var messageContext = CreateMessageContext(message, new TransportTransaction(), false);

await pipeline.PushMessage(messageContext, cancellationToken).ConfigureAwait(false);

}
catch (Exception exception)
{
var errorContext = CreateErrorContext(message, new TransportTransaction(), exception);

var errorHandleResult = await pipeline.PushFailedMessage(errorContext, cancellationToken).ConfigureAwait(false);

if (errorHandleResult == ErrorHandleResult.Handled)
{
return;
}
throw;
}
}

public async Task ProcessAtomic(
ServiceBusReceivedMessage message,
ExecutionContext executionContext,
ServiceBusClient serviceBusClient,
ServiceBusMessageActions messageActions,
ILogger functionsLogger = null,
CancellationToken cancellationToken = default)
{
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

try
{
await InitializeEndpointIfNecessary(cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
await messageActions.AbandonMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false);
throw;
}

try
{
using (var azureServiceBusTransaction = CreateTransaction(message.PartitionKey, serviceBusClient))
{
var messageContext = CreateMessageContext(message, azureServiceBusTransaction.TransportTransaction, true);

await pipeline.PushMessage(messageContext, cancellationToken).ConfigureAwait(false);

await SafeCompleteMessageAsync(messageActions, message, azureServiceBusTransaction, cancellationToken).ConfigureAwait(false);
azureServiceBusTransaction.Commit();
}
}
catch (Exception exception)
{
ErrorHandleResult result;
using (var azureServiceBusTransaction = CreateTransaction(message.PartitionKey, serviceBusClient))
{
var errorContext = CreateErrorContext(message, azureServiceBusTransaction.TransportTransaction, exception);

result = await pipeline.PushFailedMessage(errorContext, cancellationToken).ConfigureAwait(false);

if (result == ErrorHandleResult.Handled)
{
await SafeCompleteMessageAsync(messageActions, message, azureServiceBusTransaction, cancellationToken).ConfigureAwait(false);
}

azureServiceBusTransaction.Commit();
}

if (result != ErrorHandleResult.Handled)
{
await messageActions.AbandonMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
}

ErrorContext CreateErrorContext(ServiceBusReceivedMessage message, TransportTransaction transportTransaction, Exception exception)
{
var errorContext = new ErrorContext(
exception,
message.GetHeaders(),
message.MessageId,
message.Body,
transportTransaction,
message.DeliveryCount,
pipeline.ReceiveAddress,
new ContextBag());
return errorContext;
}

MessageContext CreateMessageContext(ServiceBusReceivedMessage message, TransportTransaction transportTransaction, bool atomic)
{
var contextBag = new ContextBag();
var invocationMode = new FunctionInvocationMode(atomic);
contextBag.Set(invocationMode);
var messageContext = new MessageContext(
message.MessageId,
message.GetHeaders(),
message.Body,
transportTransaction,
pipeline.ReceiveAddress,
contextBag);
return messageContext;
}

static async Task SafeCompleteMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, AzureServiceBusTransportTransaction azureServiceBusTransaction, CancellationToken cancellationToken = default)
{
using var scope = azureServiceBusTransaction.ToTransactionScope();
await messageActions.CompleteMessageAsync(message, cancellationToken).ConfigureAwait(false);
scope.Complete();
}

static AzureServiceBusTransportTransaction CreateTransaction(string messagePartitionKey, ServiceBusClient serviceBusClient) =>
new(serviceBusClient, messagePartitionKey, new TransactionOptions
{
IsolationLevel = IsolationLevel.Serializable,
Timeout = TransactionManager.MaximumTimeout
});

internal static readonly string[] AssembliesToExcludeFromScanning = {
"NCrontab.Signed.dll",
"Azure.Core.dll",
Expand All @@ -253,16 +156,16 @@ static AzureServiceBusTransportTransaction CreateTransaction(string messageParti

internal async Task InitializeEndpointIfNecessary(CancellationToken cancellationToken)
{
if (pipeline == null)
if (messageProcessor == null)
{
await semaphoreLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (pipeline == null)
if (messageProcessor == null)
{
endpoint = await endpointFactory().ConfigureAwait(false);

pipeline = serverlessInterceptor.PipelineInvoker;
messageProcessor = serverlessInterceptor.MessageProcessor;
}
}
finally
Expand All @@ -272,7 +175,7 @@ internal async Task InitializeEndpointIfNecessary(CancellationToken cancellation
}
}

PipelineInvoker pipeline;
IMessageProcessor messageProcessor;
IEndpointInstance endpoint;

readonly Func<Task<IEndpointInstance>> endpointFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ class ServerlessInterceptor

public ServerlessInterceptor(ServerlessTransport transport) => this.transport = transport;

public PipelineInvoker PipelineInvoker => transport.PipelineInvoker;
public IMessageProcessor MessageProcessor => transport.MessageProcessor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace NServiceBus.AzureFunctions.InProcess.ServiceBus
{
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.WebJobs.ServiceBus;

interface IMessageProcessor
{
Task ProcessNonAtomic(
ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default);

Task ProcessAtomic(
ServiceBusReceivedMessage message,
ServiceBusClient serviceBusClient,
ServiceBusMessageActions messageActions,
CancellationToken cancellationToken = default);
}
}

This file was deleted.

Loading

0 comments on commit fc77d19

Please sign in to comment.