diff --git a/Directory.Build.props b/Directory.Build.props index 0baa56bdd..571a89bfe 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,7 +1,7 @@ - 3.5.0$(BaseVersionSuffix) + 3.6.0$(BaseVersionSuffix) 1 $(BaseVersionSuffix) diff --git a/docs/concepts/broker/headers.md b/docs/concepts/broker/headers.md index 6d3205021..1f0804d12 100644 --- a/docs/concepts/broker/headers.md +++ b/docs/concepts/broker/headers.md @@ -119,8 +119,17 @@ Header Key | Description `tracestate` | Used for distributed tracing. It corresponds to the [Activity.TraceStateString](https://docs.microsoft.com/en-us/dotnet/api/system.diagnostics.activity). The header is implemented according to the [W3C Trace Context proposal](https://www.w3.org/TR/trace-context-1/#tracestate-header). `tracebaggage` | Used for distributed tracing. It corresponds to the string representation of the [Activity.Baggage](https://docs.microsoft.com/en-us/dotnet/api/system.diagnostics.activity) dictionary. This is not part of the w3c standard. `content-type` | The content type of the [binary file](xref:binary-files), used when producing or consuming an . +`x-failure-reason` | The header that will be set by the and will contain the reason why the message failed to be processed. + +### Kafka specific + `x-kafka-message-key` | The header that will be filled with the [key](xref:kafka-partitioning) of the message consumed from Kafka. `x-kafka-message-timestamp` | The header that will be filled with the timestamp of the message consumed from Kafka. +`x-source-consumer-group-id` | The header that will be set by the and will contain the GroupId the consumer that consumed the message that failed to be processed. +`x-source-topic` | The header that will be set by the and will contain the source topic of the message that failed to be processed. +`x-source-partition` | The header that will be set by the and will contain the source partition of the message that failed to be processed. +`x-source-offset` | The header that will be set by the and will contain the offset of the message that failed to be processed. +`x-source-timestamp` | The header that will be set by the and will contain the timestamp of the message that failed to be processed. The static classes and contain all default header names constants. diff --git a/docs/concepts/broker/kafka/multiple-consumer-groups.md b/docs/concepts/broker/kafka/multiple-consumer-groups.md index 08985fa8c..f89b109ce 100644 --- a/docs/concepts/broker/kafka/multiple-consumer-groups.md +++ b/docs/concepts/broker/kafka/multiple-consumer-groups.md @@ -72,6 +72,21 @@ public class MySubscriber ``` *** +> [!Note] +> The filters can be added dynamically using the overloads of `AddSubscriber` accepting a or and this allows you to use a variable for the group id. +> +> ```csharp +> .AddSingletonSubscriber( +> new TypeSubscriptionOptions +> { +> Filters = new[] +> { +> new KafkaGroupIdFilterAttribute("consumer1") +> } +> }) +> ``` + + Using the is the cleanest and easiest approach but alternatively you can always subscribe to the and perform different tasks according to the `GroupId` value. ```csharp diff --git a/docs/concepts/broker/mqtt/multiple-clients.md b/docs/concepts/broker/mqtt/multiple-clients.md index 5fc88e303..70b78041b 100644 --- a/docs/concepts/broker/mqtt/multiple-clients.md +++ b/docs/concepts/broker/mqtt/multiple-clients.md @@ -71,6 +71,20 @@ public class MySubscriber ``` *** +> [!Note] +> The filters can be added dynamically using the overloads of `AddSubscriber` accepting a or and this allows you to use a variable for the client id. +> +> ```csharp +> .AddSingletonSubscriber( +> new TypeSubscriptionOptions +> { +> Filters = new[] +> { +> new MqttClientIdFilterAttribute("client1") +> } +> }) +> ``` + Using the is the cleanest and easiest approach but alternatively you can always subscribe to the and perform different tasks according to the `ClientId` value. ```csharp diff --git a/docs/releases.md b/docs/releases.md index 9e5709ff6..16adc5981 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -4,6 +4,20 @@ uid: releases # Releases +## [3.6.0-beta.1](https://github.com/BEagle1984/silverback/releases/tag/v3.6.0-beta.1) + +### What's new + +* Handle `IAsyncEnumerable` returned by the subscriber and republished the contained messages +* Enrich Kafka messages moved by the adding some extra headers containing some information about the source topic, partition, offset, etc. (see ) +* Allow filters such as the or to be added to the subscribers at runtime via the configuration API (see and ) + +### Fixes + +* Fix reconnection issues +* Handle edge cases related to MQTT acknowledgment timeout in +* Allow max retries specification and error policies chains with MQTT V3 + ## [3.5.0](https://github.com/BEagle1984/silverback/releases/tag/v3.5.0) ### What's new diff --git a/src/Silverback.Integration.Kafka/Messaging/Messages/KafkaMessageHeaders.cs b/src/Silverback.Integration.Kafka/Messaging/Messages/KafkaMessageHeaders.cs index be8d30074..af4ec7ed2 100644 --- a/src/Silverback.Integration.Kafka/Messaging/Messages/KafkaMessageHeaders.cs +++ b/src/Silverback.Integration.Kafka/Messaging/Messages/KafkaMessageHeaders.cs @@ -27,31 +27,31 @@ public static class KafkaMessageHeaders public const string TimestampKey = "x-kafka-message-timestamp"; /// - /// This will be set by and will contain the group ID of - /// the message that failed to be processed. + /// This will be set by the and will contain the GroupId of + /// the consumer that consumed the message that failed to be processed. /// public const string SourceConsumerGroupId = "x-source-consumer-group-id"; /// - /// This will be set by and will contain the topic of - /// the message that failed to be processed. + /// This will be set by the and will contain the source + /// topic of the message that failed to be processed. /// public const string SourceTopic = "x-source-topic"; /// - /// This will be set by and will contain the partition of - /// the message that failed to be processed. + /// This will be set by the and will contain the source + /// partition of the message that failed to be processed. /// public const string SourcePartition = "x-source-partition"; /// - /// This will be set by and will contain the offset of + /// This will be set by the and will contain the offset of /// the message that failed to be processed. /// public const string SourceOffset = "x-source-offset"; /// - /// This will be set by and will contain the timestamp of + /// This will be set by the and will contain the timestamp of /// the message that failed to be processed. /// public const string SourceTimestamp = "x-source-timestamp"; diff --git a/src/Silverback.Integration/Messaging/Messages/DefaultMessageHeaders.cs b/src/Silverback.Integration/Messaging/Messages/DefaultMessageHeaders.cs index 18341ec7e..fe0056662 100644 --- a/src/Silverback.Integration/Messaging/Messages/DefaultMessageHeaders.cs +++ b/src/Silverback.Integration/Messaging/Messages/DefaultMessageHeaders.cs @@ -29,12 +29,6 @@ public static class DefaultMessageHeaders /// public const string FailedAttempts = "x-failed-attempts"; - /// - /// This will be set by and will contain the name of the - /// endpoint the failed message is being moved from. - /// - public const string SourceEndpoint = "x-source-endpoint"; - /// /// The message chunk index, used when chunking is enabled. /// @@ -102,7 +96,7 @@ public static class DefaultMessageHeaders public const string EncryptionKeyId = "x-encryption-key-id"; /// - /// This will be set by and will contain the reason why + /// This will be set by the and will contain the reason why /// the message failed to be processed. /// public const string FailureReason = "x-failure-reason";