diff --git a/samples/KafkaFlow.Sample.OpenTelemetry/Program.cs b/samples/KafkaFlow.Sample.OpenTelemetry/Program.cs
index 8eb56fb99..83e0a5696 100644
--- a/samples/KafkaFlow.Sample.OpenTelemetry/Program.cs
+++ b/samples/KafkaFlow.Sample.OpenTelemetry/Program.cs
@@ -32,7 +32,18 @@ public static async Task Main()
services.AddKafka(
kafka => kafka
.UseConsoleLog()
- .AddOpenTelemetryInstrumentation()
+ .AddOpenTelemetryInstrumentation(options =>
+ {
+ options.EnrichProducer = (activity, messageContext) =>
+ {
+ activity.SetTag("messaging.destination.producername", "KafkaFlowOtel");
+ };
+
+ options.EnrichConsumer = (activity, messageContext) =>
+ {
+ activity.SetTag("messaging.destination.groupid", messageContext.ConsumerContext.GroupId);
+ };
+ })
.AddCluster(
cluster => cluster
.WithBrokers(new[]
diff --git a/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs b/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs
index d2035794a..ca0d68241 100644
--- a/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs
+++ b/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs
@@ -1,4 +1,5 @@
-using KafkaFlow.OpenTelemetry;
+using System;
+using KafkaFlow.OpenTelemetry;
namespace KafkaFlow.Configuration;
@@ -11,18 +12,23 @@ public static class ExtensionMethods
/// Adds OpenTelemetry instrumentation
///
/// The Kafka configuration builder
+ /// KafkaFlowInstrumentationOptions action
///
- public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder)
+ public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder, Action options)
{
+ var kafkaFlowInstrumentationOptions = new KafkaFlowInstrumentationOptions();
+
+ options?.Invoke(kafkaFlowInstrumentationOptions);
+
builder.SubscribeGlobalEvents(hub =>
{
- hub.MessageConsumeStarted.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeStarted(eventContext.MessageContext));
+ hub.MessageConsumeStarted.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeStarted(eventContext.MessageContext, kafkaFlowInstrumentationOptions));
hub.MessageConsumeError.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeError(eventContext.MessageContext, eventContext.Exception));
hub.MessageConsumeCompleted.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeCompleted(eventContext.MessageContext));
- hub.MessageProduceStarted.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerStarted(eventContext.MessageContext));
+ hub.MessageProduceStarted.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerStarted(eventContext.MessageContext, kafkaFlowInstrumentationOptions));
hub.MessageProduceError.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerError(eventContext.MessageContext, eventContext.Exception));
@@ -31,4 +37,14 @@ public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IK
return builder;
}
+
+ ///
+ /// Adds OpenTelemetry instrumentation
+ ///
+ /// The Kafka configuration builder
+ ///
+ public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder)
+ {
+ return AddOpenTelemetryInstrumentation(builder, null);
+ }
}
diff --git a/src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentationOptions.cs b/src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentationOptions.cs
new file mode 100644
index 000000000..47f9d6854
--- /dev/null
+++ b/src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentationOptions.cs
@@ -0,0 +1,20 @@
+using System;
+using System.Diagnostics;
+
+namespace KafkaFlow.OpenTelemetry;
+
+///
+/// The options to be included in KafkaFlow instrumentation
+///
+public class KafkaFlowInstrumentationOptions
+{
+ ///
+ /// Gets or sets the Producer enricher
+ ///
+ public Action EnrichProducer { get; set; }
+
+ ///
+ /// Gets or sets the Consumer enricher
+ ///
+ public Action EnrichConsumer { get; set; }
+}
diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
index 0f99409c4..8024edfb4 100644
--- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
+++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
@@ -3,9 +3,9 @@
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
-using global::OpenTelemetry;
-using global::OpenTelemetry.Context.Propagation;
-using global::OpenTelemetry.Trace;
+using OpenTelemetry;
+using OpenTelemetry.Context.Propagation;
+using OpenTelemetry.Trace;
namespace KafkaFlow.OpenTelemetry;
@@ -17,7 +17,7 @@ internal static class OpenTelemetryConsumerEventsHandler
private const string AttributeMessagingKafkaSourcePartition = "messaging.kafka.source.partition";
private static readonly TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator;
- public static Task OnConsumeStarted(IMessageContext context)
+ public static Task OnConsumeStarted(IMessageContext context, KafkaFlowInstrumentationOptions options)
{
try
{
@@ -45,6 +45,8 @@ public static Task OnConsumeStarted(IMessageContext context)
{
SetConsumerTags(context, activity);
}
+
+ options?.EnrichConsumer?.Invoke(activity, context);
}
catch
{
diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
index 2d6e12ef2..a02631161 100644
--- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
+++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
@@ -3,9 +3,9 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
-using global::OpenTelemetry;
-using global::OpenTelemetry.Context.Propagation;
-using global::OpenTelemetry.Trace;
+using OpenTelemetry;
+using OpenTelemetry.Context.Propagation;
+using OpenTelemetry.Trace;
namespace KafkaFlow.OpenTelemetry;
@@ -16,7 +16,7 @@ internal static class OpenTelemetryProducerEventsHandler
private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition";
private static readonly TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator;
- public static Task OnProducerStarted(IMessageContext context)
+ public static Task OnProducerStarted(IMessageContext context, KafkaFlowInstrumentationOptions options)
{
try
{
@@ -55,6 +55,8 @@ public static Task OnProducerStarted(IMessageContext context)
{
SetProducerTags(context, activity);
}
+
+ options?.EnrichProducer?.Invoke(activity, context);
}
catch
{
diff --git a/tests/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs b/tests/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
index 036a9924e..5a4a0ecdb 100644
--- a/tests/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
+++ b/tests/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
@@ -3,10 +3,11 @@
using System.Diagnostics;
using System.IO;
using System.Linq;
+using System.Text;
using System.Threading.Tasks;
using AutoFixture;
-using global::OpenTelemetry;
-using global::OpenTelemetry.Trace;
+using OpenTelemetry;
+using OpenTelemetry.Trace;
using KafkaFlow.Compressor.Gzip;
using KafkaFlow.Configuration;
using KafkaFlow.IntegrationTests.Core;
@@ -31,10 +32,13 @@ public class OpenTelemetryTests
private bool _isPartitionAssigned;
+ private string _topicName;
+
[TestInitialize]
public void Setup()
{
_exportedItems = new List();
+ _topicName = $"OpenTelemetryTestTopic_{Guid.NewGuid()}";
}
[TestMethod]
@@ -64,6 +68,92 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
}
+ [TestMethod]
+ public async Task AddOpenTelemetry_WithEnrichProducer_DefinedHeaderIsIncludedInActivity()
+ {
+ // Arrange
+ string tagName = "otel.header.tag.name";
+ string headerKey = "otel-header-key";
+ string headerValue = "otel-header-value";
+ var provider = await this.GetServiceProvider(options => options.EnrichProducer = (activity, messageContext) =>
+ {
+ var header = messageContext.Headers.FirstOrDefault(x => x.Key == headerKey);
+ var valueString = Encoding.UTF8.GetString(header.Value);
+ activity.SetTag(tagName, valueString);
+ });
+
+ MessageStorage.Clear();
+
+ using var tracerProvider = Sdk.CreateTracerProviderBuilder()
+ .AddSource(KafkaFlowInstrumentation.ActivitySourceName)
+ .AddInMemoryExporter(_exportedItems)
+ .Build();
+
+ var producer = provider.GetRequiredService>();
+ var message = _fixture.Create();
+ IMessageHeaders headers = new MessageHeaders
+ {
+ { headerKey, Encoding.UTF8.GetBytes(headerValue) }
+ };
+
+ // Act
+ await producer.ProduceAsync(_topicName, "key", message, headers);
+
+ // Assert
+ var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();
+
+ var otelTagValueProducer = producerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value;
+ var otelTagValueConsumer = consumerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value;
+
+ Assert.IsNotNull(_exportedItems);
+ Assert.IsNull(otelTagValueConsumer);
+ Assert.IsNotNull(otelTagValueProducer);
+ Assert.AreEqual(otelTagValueProducer, headerValue);
+ }
+
+ [TestMethod]
+ public async Task AddOpenTelemetry_WithEnrichConsumer_DefinedHeaderIsIncludedInActivity()
+ {
+ // Arrange
+ string tagName = "otel-header-tag-name";
+ string headerKey = "otel-header-key";
+ string headerValue = "otel-header-value";
+ var provider = await this.GetServiceProvider(options => options.EnrichConsumer = (activity, messageContext) =>
+ {
+ var header = messageContext.Headers.FirstOrDefault(x => x.Key == headerKey);
+ var valueString = Encoding.UTF8.GetString(header.Value);
+ activity.SetTag(tagName, valueString);
+ });
+
+ MessageStorage.Clear();
+
+ using var tracerProvider = Sdk.CreateTracerProviderBuilder()
+ .AddSource(KafkaFlowInstrumentation.ActivitySourceName)
+ .AddInMemoryExporter(_exportedItems)
+ .Build();
+
+ var producer = provider.GetRequiredService>();
+ var message = _fixture.Create();
+ IMessageHeaders headers = new MessageHeaders
+ {
+ { headerKey, Encoding.UTF8.GetBytes(headerValue) }
+ };
+
+ // Act
+ await producer.ProduceAsync(_topicName, "key", message, headers);
+
+ // Assert
+ var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();
+
+ var otelTagValueProducer = producerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value;
+ var otelTagValueConsumer = consumerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value;
+
+ Assert.IsNotNull(_exportedItems);
+ Assert.IsNull(otelTagValueProducer);
+ Assert.IsNotNull(otelTagValueConsumer);
+ Assert.AreEqual(otelTagValueConsumer, headerValue);
+ }
+
[TestMethod]
public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity()
{
@@ -137,9 +227,9 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp
Assert.AreEqual(consumerSpan.GetBaggageItem(baggageName2), baggageValue2);
}
- private async Task GetServiceProvider()
+ private async Task GetServiceProvider(Action options = null)
{
- var topicName = $"OpenTelemetryTestTopic_{Guid.NewGuid()}";
+
_isPartitionAssigned = false;
@@ -163,17 +253,17 @@ private async Task GetServiceProvider()
.AddCluster(
cluster => cluster
.WithBrokers(context.Configuration.GetValue("Kafka:Brokers").Split(';'))
- .CreateTopicIfNotExists(topicName, 1, 1)
+ .CreateTopicIfNotExists(_topicName, 1, 1)
.AddProducer(
producer => producer
- .DefaultTopic(topicName)
+ .DefaultTopic(_topicName)
.AddMiddlewares(
middlewares => middlewares
.AddCompressor()))
.AddConsumer(
consumer => consumer
- .Topic(topicName)
- .WithGroupId(topicName)
+ .Topic(_topicName)
+ .WithGroupId(_topicName)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
@@ -185,7 +275,7 @@ private async Task GetServiceProvider()
{
_isPartitionAssigned = true;
})))
- .AddOpenTelemetryInstrumentation()))
+ .AddOpenTelemetryInstrumentation(options)))
.UseDefaultServiceProvider(
(_, options) =>
{
diff --git a/website/docs/guides/open-telemetry.md b/website/docs/guides/open-telemetry.md
index 114d1948d..0a77b6845 100644
--- a/website/docs/guides/open-telemetry.md
+++ b/website/docs/guides/open-telemetry.md
@@ -25,7 +25,40 @@ services.AddKafka(
);
```
-Once you have your .NET application instrumentation configured ([see here](https://opentelemetry.io/docs/instrumentation/net/getting-started/)), you just need to subscribe to the source `KafkaFlow.OpenTelemetry` that is accessible through a constant at `KafkaFlowInstrumentation.ActivitySourceName`.
+Once you have your .NET application instrumentation configured ([see here](https://opentelemetry.io/docs/instrumentation/net/getting-started/)), the KafkaFlow activity can be captured by adding the source `KafkaFlowInstrumentation.ActivitySourceName` in the tracer provider builder, e.g.:
+
+```csharp
+ using var tracerProvider = Sdk.CreateTracerProviderBuilder()
+ .AddSource(KafkaFlowInstrumentation.ActivitySourceName)
+ ...
+```
+
+## Advanced Configuration
+
+The instrumentation can be configured to change the default behavior by using KafkaFlowInstrumentationOptions.
+
+### Enrich
+
+This option can be used to enrich the `Activity` with additional information from `IMessageContext` object. It defines separate methods for producer and consumer enrich:
+
+```csharp
+services.AddKafka(
+ kafka => kafka
+ .AddCluster(...)
+ .AddOpenTelemetryInstrumentation(options =>
+ {
+ options.EnrichProducer = (activity, messageContext) =>
+ {
+ activity.SetTag("messaging.destination.producername", "KafkaFlowOtel");
+ };
+
+ options.EnrichConsumer = (activity, messageContext) =>
+ {
+ activity.SetTag("messaging.destination.group.id", messageContext.ConsumerContext.GroupId);
+ };
+ })
+);
+```
## Using .NET Automatic Instrumentation