From c899783ee53df5398423c1f75490e4cf37eab204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Wed, 28 Feb 2024 15:46:25 +0000 Subject: [PATCH] feat: add producer and consumer enricher --- .../KafkaFlow.Sample.OpenTelemetry/Program.cs | 13 ++- .../ExtensionMethods.cs | 24 +++- .../KafkaFlowInstrumentationOptions.cs | 20 ++++ .../OpenTelemetryConsumerEventsHandler.cs | 10 +- .../OpenTelemetryProducerEventsHandler.cs | 10 +- .../OpenTelemetryTests.cs | 108 ++++++++++++++++-- website/docs/guides/open-telemetry.md | 35 +++++- 7 files changed, 197 insertions(+), 23 deletions(-) create mode 100644 src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentationOptions.cs diff --git a/samples/KafkaFlow.Sample.OpenTelemetry/Program.cs b/samples/KafkaFlow.Sample.OpenTelemetry/Program.cs index 8eb56fb99..83e0a5696 100644 --- a/samples/KafkaFlow.Sample.OpenTelemetry/Program.cs +++ b/samples/KafkaFlow.Sample.OpenTelemetry/Program.cs @@ -32,7 +32,18 @@ public static async Task Main() services.AddKafka( kafka => kafka .UseConsoleLog() - .AddOpenTelemetryInstrumentation() + .AddOpenTelemetryInstrumentation(options => + { + options.EnrichProducer = (activity, messageContext) => + { + activity.SetTag("messaging.destination.producername", "KafkaFlowOtel"); + }; + + options.EnrichConsumer = (activity, messageContext) => + { + activity.SetTag("messaging.destination.groupid", messageContext.ConsumerContext.GroupId); + }; + }) .AddCluster( cluster => cluster .WithBrokers(new[] diff --git a/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs b/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs index d2035794a..ca0d68241 100644 --- a/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs +++ b/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs @@ -1,4 +1,5 @@ -using KafkaFlow.OpenTelemetry; +using System; +using KafkaFlow.OpenTelemetry; namespace KafkaFlow.Configuration; @@ -11,18 +12,23 @@ public static class ExtensionMethods /// Adds OpenTelemetry instrumentation /// /// The Kafka configuration builder + /// KafkaFlowInstrumentationOptions action /// - public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder) + public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder, Action options) { + var kafkaFlowInstrumentationOptions = new KafkaFlowInstrumentationOptions(); + + options?.Invoke(kafkaFlowInstrumentationOptions); + builder.SubscribeGlobalEvents(hub => { - hub.MessageConsumeStarted.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeStarted(eventContext.MessageContext)); + hub.MessageConsumeStarted.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeStarted(eventContext.MessageContext, kafkaFlowInstrumentationOptions)); hub.MessageConsumeError.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeError(eventContext.MessageContext, eventContext.Exception)); hub.MessageConsumeCompleted.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeCompleted(eventContext.MessageContext)); - hub.MessageProduceStarted.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerStarted(eventContext.MessageContext)); + hub.MessageProduceStarted.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerStarted(eventContext.MessageContext, kafkaFlowInstrumentationOptions)); hub.MessageProduceError.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerError(eventContext.MessageContext, eventContext.Exception)); @@ -31,4 +37,14 @@ public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IK return builder; } + + /// + /// Adds OpenTelemetry instrumentation + /// + /// The Kafka configuration builder + /// + public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder) + { + return AddOpenTelemetryInstrumentation(builder, null); + } } diff --git a/src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentationOptions.cs b/src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentationOptions.cs new file mode 100644 index 000000000..47f9d6854 --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentationOptions.cs @@ -0,0 +1,20 @@ +using System; +using System.Diagnostics; + +namespace KafkaFlow.OpenTelemetry; + +/// +/// The options to be included in KafkaFlow instrumentation +/// +public class KafkaFlowInstrumentationOptions +{ + /// + /// Gets or sets the Producer enricher + /// + public Action EnrichProducer { get; set; } + + /// + /// Gets or sets the Consumer enricher + /// + public Action EnrichConsumer { get; set; } +} diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs index 0f99409c4..8024edfb4 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs @@ -3,9 +3,9 @@ using System.Diagnostics; using System.Text; using System.Threading.Tasks; -using global::OpenTelemetry; -using global::OpenTelemetry.Context.Propagation; -using global::OpenTelemetry.Trace; +using OpenTelemetry; +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Trace; namespace KafkaFlow.OpenTelemetry; @@ -17,7 +17,7 @@ internal static class OpenTelemetryConsumerEventsHandler private const string AttributeMessagingKafkaSourcePartition = "messaging.kafka.source.partition"; private static readonly TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator; - public static Task OnConsumeStarted(IMessageContext context) + public static Task OnConsumeStarted(IMessageContext context, KafkaFlowInstrumentationOptions options) { try { @@ -45,6 +45,8 @@ public static Task OnConsumeStarted(IMessageContext context) { SetConsumerTags(context, activity); } + + options?.EnrichConsumer?.Invoke(activity, context); } catch { diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs index 2d6e12ef2..a02631161 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs @@ -3,9 +3,9 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -using global::OpenTelemetry; -using global::OpenTelemetry.Context.Propagation; -using global::OpenTelemetry.Trace; +using OpenTelemetry; +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Trace; namespace KafkaFlow.OpenTelemetry; @@ -16,7 +16,7 @@ internal static class OpenTelemetryProducerEventsHandler private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition"; private static readonly TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator; - public static Task OnProducerStarted(IMessageContext context) + public static Task OnProducerStarted(IMessageContext context, KafkaFlowInstrumentationOptions options) { try { @@ -55,6 +55,8 @@ public static Task OnProducerStarted(IMessageContext context) { SetProducerTags(context, activity); } + + options?.EnrichProducer?.Invoke(activity, context); } catch { diff --git a/tests/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs b/tests/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs index 036a9924e..5a4a0ecdb 100644 --- a/tests/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs +++ b/tests/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs @@ -3,10 +3,11 @@ using System.Diagnostics; using System.IO; using System.Linq; +using System.Text; using System.Threading.Tasks; using AutoFixture; -using global::OpenTelemetry; -using global::OpenTelemetry.Trace; +using OpenTelemetry; +using OpenTelemetry.Trace; using KafkaFlow.Compressor.Gzip; using KafkaFlow.Configuration; using KafkaFlow.IntegrationTests.Core; @@ -31,10 +32,13 @@ public class OpenTelemetryTests private bool _isPartitionAssigned; + private string _topicName; + [TestInitialize] public void Setup() { _exportedItems = new List(); + _topicName = $"OpenTelemetryTestTopic_{Guid.NewGuid()}"; } [TestMethod] @@ -64,6 +68,92 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId); } + [TestMethod] + public async Task AddOpenTelemetry_WithEnrichProducer_DefinedHeaderIsIncludedInActivity() + { + // Arrange + string tagName = "otel.header.tag.name"; + string headerKey = "otel-header-key"; + string headerValue = "otel-header-value"; + var provider = await this.GetServiceProvider(options => options.EnrichProducer = (activity, messageContext) => + { + var header = messageContext.Headers.FirstOrDefault(x => x.Key == headerKey); + var valueString = Encoding.UTF8.GetString(header.Value); + activity.SetTag(tagName, valueString); + }); + + MessageStorage.Clear(); + + using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource(KafkaFlowInstrumentation.ActivitySourceName) + .AddInMemoryExporter(_exportedItems) + .Build(); + + var producer = provider.GetRequiredService>(); + var message = _fixture.Create(); + IMessageHeaders headers = new MessageHeaders + { + { headerKey, Encoding.UTF8.GetBytes(headerValue) } + }; + + // Act + await producer.ProduceAsync(_topicName, "key", message, headers); + + // Assert + var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync(); + + var otelTagValueProducer = producerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value; + var otelTagValueConsumer = consumerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value; + + Assert.IsNotNull(_exportedItems); + Assert.IsNull(otelTagValueConsumer); + Assert.IsNotNull(otelTagValueProducer); + Assert.AreEqual(otelTagValueProducer, headerValue); + } + + [TestMethod] + public async Task AddOpenTelemetry_WithEnrichConsumer_DefinedHeaderIsIncludedInActivity() + { + // Arrange + string tagName = "otel-header-tag-name"; + string headerKey = "otel-header-key"; + string headerValue = "otel-header-value"; + var provider = await this.GetServiceProvider(options => options.EnrichConsumer = (activity, messageContext) => + { + var header = messageContext.Headers.FirstOrDefault(x => x.Key == headerKey); + var valueString = Encoding.UTF8.GetString(header.Value); + activity.SetTag(tagName, valueString); + }); + + MessageStorage.Clear(); + + using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource(KafkaFlowInstrumentation.ActivitySourceName) + .AddInMemoryExporter(_exportedItems) + .Build(); + + var producer = provider.GetRequiredService>(); + var message = _fixture.Create(); + IMessageHeaders headers = new MessageHeaders + { + { headerKey, Encoding.UTF8.GetBytes(headerValue) } + }; + + // Act + await producer.ProduceAsync(_topicName, "key", message, headers); + + // Assert + var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync(); + + var otelTagValueProducer = producerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value; + var otelTagValueConsumer = consumerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value; + + Assert.IsNotNull(_exportedItems); + Assert.IsNull(otelTagValueProducer); + Assert.IsNotNull(otelTagValueConsumer); + Assert.AreEqual(otelTagValueConsumer, headerValue); + } + [TestMethod] public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity() { @@ -137,9 +227,9 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp Assert.AreEqual(consumerSpan.GetBaggageItem(baggageName2), baggageValue2); } - private async Task GetServiceProvider() + private async Task GetServiceProvider(Action options = null) { - var topicName = $"OpenTelemetryTestTopic_{Guid.NewGuid()}"; + _isPartitionAssigned = false; @@ -163,17 +253,17 @@ private async Task GetServiceProvider() .AddCluster( cluster => cluster .WithBrokers(context.Configuration.GetValue("Kafka:Brokers").Split(';')) - .CreateTopicIfNotExists(topicName, 1, 1) + .CreateTopicIfNotExists(_topicName, 1, 1) .AddProducer( producer => producer - .DefaultTopic(topicName) + .DefaultTopic(_topicName) .AddMiddlewares( middlewares => middlewares .AddCompressor())) .AddConsumer( consumer => consumer - .Topic(topicName) - .WithGroupId(topicName) + .Topic(_topicName) + .WithGroupId(_topicName) .WithBufferSize(100) .WithWorkersCount(10) .WithAutoOffsetReset(AutoOffsetReset.Latest) @@ -185,7 +275,7 @@ private async Task GetServiceProvider() { _isPartitionAssigned = true; }))) - .AddOpenTelemetryInstrumentation())) + .AddOpenTelemetryInstrumentation(options))) .UseDefaultServiceProvider( (_, options) => { diff --git a/website/docs/guides/open-telemetry.md b/website/docs/guides/open-telemetry.md index 114d1948d..0a77b6845 100644 --- a/website/docs/guides/open-telemetry.md +++ b/website/docs/guides/open-telemetry.md @@ -25,7 +25,40 @@ services.AddKafka( ); ``` -Once you have your .NET application instrumentation configured ([see here](https://opentelemetry.io/docs/instrumentation/net/getting-started/)), you just need to subscribe to the source `KafkaFlow.OpenTelemetry` that is accessible through a constant at `KafkaFlowInstrumentation.ActivitySourceName`. +Once you have your .NET application instrumentation configured ([see here](https://opentelemetry.io/docs/instrumentation/net/getting-started/)), the KafkaFlow activity can be captured by adding the source `KafkaFlowInstrumentation.ActivitySourceName` in the tracer provider builder, e.g.: + +```csharp + using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource(KafkaFlowInstrumentation.ActivitySourceName) + ... +``` + +## Advanced Configuration + +The instrumentation can be configured to change the default behavior by using KafkaFlowInstrumentationOptions. + +### Enrich + +This option can be used to enrich the `Activity` with additional information from `IMessageContext` object. It defines separate methods for producer and consumer enrich: + +```csharp +services.AddKafka( + kafka => kafka + .AddCluster(...) + .AddOpenTelemetryInstrumentation(options => + { + options.EnrichProducer = (activity, messageContext) => + { + activity.SetTag("messaging.destination.producername", "KafkaFlowOtel"); + }; + + options.EnrichConsumer = (activity, messageContext) => + { + activity.SetTag("messaging.destination.group.id", messageContext.ConsumerContext.GroupId); + }; + }) +); +``` ## Using .NET Automatic Instrumentation