Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add producer and consumer enricher for OpenTelemetry instrumentation #535

Merged
merged 1 commit into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion samples/KafkaFlow.Sample.OpenTelemetry/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,18 @@ public static async Task Main()
services.AddKafka(
kafka => kafka
.UseConsoleLog()
.AddOpenTelemetryInstrumentation()
.AddOpenTelemetryInstrumentation(options =>
{
options.EnrichProducer = (activity, messageContext) =>
{
activity.SetTag("messaging.destination.producername", "KafkaFlowOtel");
};

options.EnrichConsumer = (activity, messageContext) =>
{
activity.SetTag("messaging.destination.groupid", messageContext.ConsumerContext.GroupId);
};
})
.AddCluster(
cluster => cluster
.WithBrokers(new[]
Expand Down
24 changes: 20 additions & 4 deletions src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using KafkaFlow.OpenTelemetry;
using System;
using KafkaFlow.OpenTelemetry;

namespace KafkaFlow.Configuration;

Expand All @@ -11,18 +12,23 @@ public static class ExtensionMethods
/// Adds OpenTelemetry instrumentation
/// </summary>
/// <param name="builder">The Kafka configuration builder</param>
/// <param name="options">KafkaFlowInstrumentationOptions action</param>
/// <returns></returns>
public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder)
public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder, Action<KafkaFlowInstrumentationOptions> options)
{
var kafkaFlowInstrumentationOptions = new KafkaFlowInstrumentationOptions();

options?.Invoke(kafkaFlowInstrumentationOptions);

builder.SubscribeGlobalEvents(hub =>
{
hub.MessageConsumeStarted.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeStarted(eventContext.MessageContext));
hub.MessageConsumeStarted.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeStarted(eventContext.MessageContext, kafkaFlowInstrumentationOptions));

hub.MessageConsumeError.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeError(eventContext.MessageContext, eventContext.Exception));

hub.MessageConsumeCompleted.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeCompleted(eventContext.MessageContext));

hub.MessageProduceStarted.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerStarted(eventContext.MessageContext));
hub.MessageProduceStarted.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerStarted(eventContext.MessageContext, kafkaFlowInstrumentationOptions));

hub.MessageProduceError.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerError(eventContext.MessageContext, eventContext.Exception));

Expand All @@ -31,4 +37,14 @@ public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IK

return builder;
}

/// <summary>
/// Adds OpenTelemetry instrumentation
/// </summary>
/// <param name="builder">The Kafka configuration builder</param>
/// <returns></returns>
public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder)
{
return AddOpenTelemetryInstrumentation(builder, null);
}
}
20 changes: 20 additions & 0 deletions src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentationOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using System.Diagnostics;

namespace KafkaFlow.OpenTelemetry;

/// <summary>
/// The options to be included in KafkaFlow instrumentation
/// </summary>
public class KafkaFlowInstrumentationOptions
{
/// <summary>
/// Gets or sets the Producer enricher
/// </summary>
public Action<Activity, IMessageContext> EnrichProducer { get; set; }

/// <summary>
/// Gets or sets the Consumer enricher
/// </summary>
public Action<Activity, IMessageContext> EnrichConsumer { get; set; }
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
using System;

Check warning on line 1 in src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs

View workflow job for this annotation

GitHub Actions / build

File is required to end with a single newline character (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1518.md) [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj]
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using global::OpenTelemetry;
using global::OpenTelemetry.Context.Propagation;
using global::OpenTelemetry.Trace;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace;

namespace KafkaFlow.OpenTelemetry;

Expand All @@ -17,7 +17,7 @@
private const string AttributeMessagingKafkaSourcePartition = "messaging.kafka.source.partition";
private static readonly TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator;

public static Task OnConsumeStarted(IMessageContext context)
public static Task OnConsumeStarted(IMessageContext context, KafkaFlowInstrumentationOptions options)
{
try
{
Expand Down Expand Up @@ -45,6 +45,8 @@
{
SetConsumerTags(context, activity);
}

options?.EnrichConsumer?.Invoke(activity, context);
}
catch
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using global::OpenTelemetry;
using global::OpenTelemetry.Context.Propagation;
using global::OpenTelemetry.Trace;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace;

namespace KafkaFlow.OpenTelemetry;

Expand All @@ -16,7 +16,7 @@ internal static class OpenTelemetryProducerEventsHandler
private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition";
private static readonly TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator;

public static Task OnProducerStarted(IMessageContext context)
public static Task OnProducerStarted(IMessageContext context, KafkaFlowInstrumentationOptions options)
{
try
{
Expand Down Expand Up @@ -55,6 +55,8 @@ public static Task OnProducerStarted(IMessageContext context)
{
SetProducerTags(context, activity);
}

options?.EnrichProducer?.Invoke(activity, context);
}
catch
{
Expand Down
108 changes: 99 additions & 9 deletions tests/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using AutoFixture;
using global::OpenTelemetry;
using global::OpenTelemetry.Trace;
using OpenTelemetry;
using OpenTelemetry.Trace;
using KafkaFlow.Compressor.Gzip;
using KafkaFlow.Configuration;
using KafkaFlow.IntegrationTests.Core;
Expand All @@ -31,10 +32,13 @@ public class OpenTelemetryTests

private bool _isPartitionAssigned;

private string _topicName;

[TestInitialize]
public void Setup()
{
_exportedItems = new List<Activity>();
_topicName = $"OpenTelemetryTestTopic_{Guid.NewGuid()}";
}

[TestMethod]
Expand Down Expand Up @@ -64,6 +68,92 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
}

[TestMethod]
public async Task AddOpenTelemetry_WithEnrichProducer_DefinedHeaderIsIncludedInActivity()
{
// Arrange
string tagName = "otel.header.tag.name";
string headerKey = "otel-header-key";
string headerValue = "otel-header-value";
var provider = await this.GetServiceProvider(options => options.EnrichProducer = (activity, messageContext) =>
{
var header = messageContext.Headers.FirstOrDefault(x => x.Key == headerKey);
var valueString = Encoding.UTF8.GetString(header.Value);
activity.SetTag(tagName, valueString);
});

MessageStorage.Clear();

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(KafkaFlowInstrumentation.ActivitySourceName)
.AddInMemoryExporter(_exportedItems)
.Build();

var producer = provider.GetRequiredService<IMessageProducer<GzipProducer>>();
var message = _fixture.Create<byte[]>();
IMessageHeaders headers = new MessageHeaders
{
{ headerKey, Encoding.UTF8.GetBytes(headerValue) }
};

// Act
await producer.ProduceAsync(_topicName, "key", message, headers);

// Assert
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

var otelTagValueProducer = producerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value;
var otelTagValueConsumer = consumerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value;

Assert.IsNotNull(_exportedItems);
Assert.IsNull(otelTagValueConsumer);
Assert.IsNotNull(otelTagValueProducer);
Assert.AreEqual(otelTagValueProducer, headerValue);
}

[TestMethod]
public async Task AddOpenTelemetry_WithEnrichConsumer_DefinedHeaderIsIncludedInActivity()
{
// Arrange
string tagName = "otel-header-tag-name";
string headerKey = "otel-header-key";
string headerValue = "otel-header-value";
var provider = await this.GetServiceProvider(options => options.EnrichConsumer = (activity, messageContext) =>
{
var header = messageContext.Headers.FirstOrDefault(x => x.Key == headerKey);
var valueString = Encoding.UTF8.GetString(header.Value);
activity.SetTag(tagName, valueString);
});

MessageStorage.Clear();

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(KafkaFlowInstrumentation.ActivitySourceName)
.AddInMemoryExporter(_exportedItems)
.Build();

var producer = provider.GetRequiredService<IMessageProducer<GzipProducer>>();
var message = _fixture.Create<byte[]>();
IMessageHeaders headers = new MessageHeaders
{
{ headerKey, Encoding.UTF8.GetBytes(headerValue) }
};

// Act
await producer.ProduceAsync(_topicName, "key", message, headers);

// Assert
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

var otelTagValueProducer = producerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value;
var otelTagValueConsumer = consumerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value;

Assert.IsNotNull(_exportedItems);
Assert.IsNull(otelTagValueProducer);
Assert.IsNotNull(otelTagValueConsumer);
Assert.AreEqual(otelTagValueConsumer, headerValue);
}

[TestMethod]
public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity()
{
Expand Down Expand Up @@ -137,9 +227,9 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp
Assert.AreEqual(consumerSpan.GetBaggageItem(baggageName2), baggageValue2);
}

private async Task<IServiceProvider> GetServiceProvider()
private async Task<IServiceProvider> GetServiceProvider(Action<KafkaFlowInstrumentationOptions> options = null)
{
var topicName = $"OpenTelemetryTestTopic_{Guid.NewGuid()}";


_isPartitionAssigned = false;

Expand All @@ -163,17 +253,17 @@ private async Task<IServiceProvider> GetServiceProvider()
.AddCluster(
cluster => cluster
.WithBrokers(context.Configuration.GetValue<string>("Kafka:Brokers").Split(';'))
.CreateTopicIfNotExists(topicName, 1, 1)
.CreateTopicIfNotExists(_topicName, 1, 1)
.AddProducer<GzipProducer>(
producer => producer
.DefaultTopic(topicName)
.DefaultTopic(_topicName)
.AddMiddlewares(
middlewares => middlewares
.AddCompressor<GzipMessageCompressor>()))
.AddConsumer(
consumer => consumer
.Topic(topicName)
.WithGroupId(topicName)
.Topic(_topicName)
.WithGroupId(_topicName)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
Expand All @@ -185,7 +275,7 @@ private async Task<IServiceProvider> GetServiceProvider()
{
_isPartitionAssigned = true;
})))
.AddOpenTelemetryInstrumentation()))
.AddOpenTelemetryInstrumentation(options)))
.UseDefaultServiceProvider(
(_, options) =>
{
Expand Down
35 changes: 34 additions & 1 deletion website/docs/guides/open-telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,40 @@ services.AddKafka(
);
```

Once you have your .NET application instrumentation configured ([see here](https://opentelemetry.io/docs/instrumentation/net/getting-started/)), you just need to subscribe to the source `KafkaFlow.OpenTelemetry` that is accessible through a constant at `KafkaFlowInstrumentation.ActivitySourceName`.
Once you have your .NET application instrumentation configured ([see here](https://opentelemetry.io/docs/instrumentation/net/getting-started/)), the KafkaFlow activity can be captured by adding the source `KafkaFlowInstrumentation.ActivitySourceName` in the tracer provider builder, e.g.:

```csharp
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(KafkaFlowInstrumentation.ActivitySourceName)
...
```

## Advanced Configuration

The instrumentation can be configured to change the default behavior by using KafkaFlowInstrumentationOptions.

### Enrich

This option can be used to enrich the `Activity` with additional information from `IMessageContext` object. It defines separate methods for producer and consumer enrich:

```csharp
services.AddKafka(
kafka => kafka
.AddCluster(...)
.AddOpenTelemetryInstrumentation(options =>
{
options.EnrichProducer = (activity, messageContext) =>
{
activity.SetTag("messaging.destination.producername", "KafkaFlowOtel");
};

options.EnrichConsumer = (activity, messageContext) =>
{
activity.SetTag("messaging.destination.group.id", messageContext.ConsumerContext.GroupId);
};
})
);
```

## Using .NET Automatic Instrumentation

Expand Down
Loading