Skip to content

Commit

Permalink
#358 Modify ASB properties
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Pringle <[email protected]>
  • Loading branch information
EtherZa authored and zarusz committed Jan 30, 2025
1 parent 637197e commit 677cc30
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 16 deletions.
2 changes: 1 addition & 1 deletion docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -1263,5 +1263,5 @@ This allows to recreate missing elements in the infrastructure without restartin

## Versions

- The v3 release [migration guide](https://github.com/zarusz/SlimMessageBus/tree/release/v3).
- The v3 release [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/3.0.0).
- The v2 release [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/Host.Transport-2.0.0).
53 changes: 48 additions & 5 deletions docs/provider_azure_servicebus.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Default Subscription Name](#default-subscription-name)
- [Consumer context](#consumer-context)
- [Exception Handling for Consumers](#exception-handling-for-consumers)
- [DeadLetter: Application-Level Dead-Lettering](#deadletter-application-level-dead-lettering)
- [Failure: Modify Application Properties on Failure](#failure-modify-application-properties-on-failure)
- [Transport Specific Settings](#transport-specific-settings)
- [Request-Response Configuration](#request-response-configuration)
- [Produce Request Messages](#produce-request-messages)
Expand All @@ -19,6 +21,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Validation of Topology](#validation-of-topology)
- [Trigger Topology Provisioning](#trigger-topology-provisioning)


## Configuration

Azure Service Bus provider requires a connection string:
Expand Down Expand Up @@ -193,12 +196,52 @@ This could be useful to extract the message's `CorrelationId` or `ApplicationPro
### Exception Handling for Consumers

In case the consumer was to throw an exception while processing a message, SMB marks the message as abandoned.
This results in a message delivery retry performed by Azure SB (potentially event in another running instance of your service). By default, Azure SB retries 10 times. After last attempt the message Azure SB moves the message to a dead letter queue (DLQ). More information [here](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues).
In the case where the consumer throws an exception while processing a message, SMB marks the message as abandoned.
This results in a message delivery retry performed by Azure SB (potentially as an event in another running instance of your service). By default, Azure SB retries 10 times. After last attempt, Azure SB will move the message to the [dead letter queue (DLQ)](https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues).

SMB will also add a user property, `SMB.Exception`, on the message with the exception details (just the message, no stack trace). This should be helpful when reviewing messages on the DLQ.

For finer control, a custom error handler can be added by registering an instance of `IConsumerErrorHandler<T>` with the DI. The offending message and the raised exception can then be inspected to determine if the message should be retried (in proceess), failed, or considered as having executed successfully.

In addition to the standard `IConsumerErrorHandler<T>` return types, `ServiceBusConsumerErrorHandler<T>` provides additional, specialized responses for use with the Azure Service Bus transport.

#### DeadLetter: Application-Level Dead-Lettering

[Application-level dead-lettering](https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues#application-level-dead-lettering) is supported via `DeadLetter(string reason, string description)`. If neither a `reason` nor `description` are supplied, the raised exception type and message will be used as the `reason` and `description`.

If you need to send only selected messages to DLQ, wrap the body of your consumer method in a `try-catch` block and rethrow the exception for only the messages you want to be moved to DLQ (after the retry limit is reached).
```cs
public sealed class SampleConsumerErrorHandler<T> : ServiceBusConsumerErrorHandler<T>
{
public override Task<ProcessResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
return Task.FromResult(DeadLetter("reason", "description"));
}
}
```

#### Failure: Modify Application Properties on Failure

An overload to `Failure(IReadOnlyDictionary<string, object)` is included to facilitate the modification of application properites on a failed message. This includes the `SMB.Exception` property should alternative detail be required.

```cs
public sealed class SampleConsumerErrorHandler<T> : ServiceBusConsumerErrorHandler<T>
{
public override Task<ProcessResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
var properties = new Dictionary<string, object>
{
{ "Key", "value" },
{ "Attempts", attempts },
{ "SMB.Exception", exception.ToString().Substring(0, 1000) }
};

return Task.FromResult(Failure(properties));
}
}
```

> By using `IConsumerContext.Properties` (`IConsumerWithContext`) to pass state to the `IConsumerErrorHandler<T>` instance, consumer state can be persisted with the message. This can then be retrieved from `IConsumerContext.Headers` in a subsequent execution to resume processing from a checkpoint, supporting idempotency, especially when distributed transactions are not possible.
SMB will also set a user property `SMB.Exception` on the message with the exception details (just the message, no stack trace). This should be helpful when reviewing messages on the DLQ.

### Transport Specific Settings

Expand Down Expand Up @@ -480,4 +523,4 @@ However, in situations when the underlying ASB topology changes (queue / topic i
ITopologyControl ctrl = // injected
await ctrl.ProvisionTopology();
```
```
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ protected async Task ProcessMessageAsyncInternal(
Func<ServiceBusReceivedMessage, string, string, CancellationToken, Task> deadLetterMessage,
CancellationToken token)
{
const string smbException = "SMB.Exception";

// Process the message.
Logger.LogDebug("Received message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);

Expand All @@ -188,16 +190,33 @@ protected async Task ProcessMessageAsyncInternal(
await completeMessage(message, token).ConfigureAwait(false);
return;

case ServiceBusProcessResult.DeadLetterState:
case ServiceBusProcessResult.DeadLetterState deadLetterState:
Logger.LogError(r.Exception, "Dead letter message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
await deadLetterMessage(message, r.Exception?.GetType().Name ?? string.Empty, r.Exception?.Message ?? string.Empty, token).ConfigureAwait(false);

var reason = deadLetterState.Reason ?? r.Exception?.GetType().Name ?? string.Empty;
var descripiton = deadLetterState.Description ?? r.Exception?.GetType().Name ?? string.Empty;
await deadLetterMessage(message, reason, descripiton, token).ConfigureAwait(false);
return;

case ServiceBusProcessResult.FailureStateWithProperties withProperties:
var dict = new Dictionary<string, object>(withProperties.Properties.Count + 1);
foreach (var properties in withProperties.Properties)
{
dict.Add(properties.Key, properties.Value);
}

// Set the exception message if it has not been provided
dict.TryAdd(smbException, r.Exception.Message);

Logger.LogError(r.Exception, "Abandon message (exception occurred while processing) - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
await abandonMessage(message, dict, token).ConfigureAwait(false);
return;

case ProcessResult.FailureState:
var messageProperties = new Dictionary<string, object>();
{
// Set the exception message
messageProperties.Add("SMB.Exception", r.Exception.Message);
messageProperties.Add(smbException, r.Exception.Message);
}

Logger.LogError(r.Exception, "Abandon message (exception occurred while processing) - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,19 @@ public interface IServiceBusConsumerErrorHandler<in T> : IConsumerErrorHandler<T

public abstract class ServiceBusConsumerErrorHandler<T> : ConsumerErrorHandler<T>, IServiceBusConsumerErrorHandler<T>
{
public ProcessResult DeadLetter() => ServiceBusProcessResult.DeadLetter;
public ProcessResult DeadLetter(string reason = null, string description = null)
{
return reason == null && description == null
? ServiceBusProcessResult.DeadLetter
: new ServiceBusProcessResult.DeadLetterState(reason, description);
}

public ProcessResult Failure(IReadOnlyDictionary<string, object> properties)
{
return properties != null && properties.Count > 0
? new ServiceBusProcessResult.FailureStateWithProperties(properties)
: Failure();
}
}

public record ServiceBusProcessResult : ProcessResult
Expand All @@ -14,5 +26,25 @@ public record ServiceBusProcessResult : ProcessResult
/// </summary>
public static readonly ProcessResult DeadLetter = new DeadLetterState();

public record DeadLetterState() : ProcessResult();
public record DeadLetterState : ProcessResult
{
public DeadLetterState(string reason = null, string description = null)
{
Reason = reason;
Description = description;
}

public string Reason { get; }
public string Description { get; }
}

public record FailureStateWithProperties : FailureState
{
public FailureStateWithProperties(IReadOnlyDictionary<string, object> properties)
{
Properties = properties;
}

public IReadOnlyDictionary<string, object> Properties { get; }
}
}
2 changes: 1 addition & 1 deletion src/SlimMessageBus/IConsumerWithContext.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
namespace SlimMessageBus;

/// <summary>
/// An extension point for <see cref="IConsumer{TMessage}"/> to recieve provider specific (for current message subject to processing).
/// An extension point for <see cref="IConsumer{TMessage}"/> to receive provider specific (for current message subject to processing).
/// </summary>
public interface IConsumerWithContext
{
Expand Down
Loading

0 comments on commit 677cc30

Please sign in to comment.