Skip to content

Commit

Permalink
feat: add producer and consumer enricher
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Feb 28, 2024
1 parent dc0882f commit 807518b
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 22 deletions.
8 changes: 7 additions & 1 deletion samples/KafkaFlow.Sample.OpenTelemetry/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ public static async Task Main()
services.AddKafka(
kafka => kafka
.UseConsoleLog()
.AddOpenTelemetryInstrumentation()
.AddOpenTelemetryInstrumentation(options =>
{
options.EnrichProducer = (activity, messageContext) =>
{
activity.SetTag("messaging.destination.producername", producerName);
};
})
.AddCluster(
cluster => cluster
.WithBrokers(new[]
Expand Down
24 changes: 20 additions & 4 deletions src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using KafkaFlow.OpenTelemetry;
using System;
using KafkaFlow.OpenTelemetry;

namespace KafkaFlow.Configuration;

Expand All @@ -11,18 +12,23 @@ public static class ExtensionMethods
/// Adds OpenTelemetry instrumentation
/// </summary>
/// <param name="builder">The Kafka configuration builder</param>
/// <param name="options">KafkaFlowInstrumentationOptions action</param>
/// <returns></returns>
public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder)
public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder, Action<KafkaFlowInstrumentationOptions> options)
{
var kafkaFlowInstrumentationOptions = new KafkaFlowInstrumentationOptions();

options?.Invoke(kafkaFlowInstrumentationOptions);

builder.SubscribeGlobalEvents(hub =>
{
hub.MessageConsumeStarted.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeStarted(eventContext.MessageContext));
hub.MessageConsumeStarted.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeStarted(eventContext.MessageContext, kafkaFlowInstrumentationOptions));

hub.MessageConsumeError.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeError(eventContext.MessageContext, eventContext.Exception));

hub.MessageConsumeCompleted.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeCompleted(eventContext.MessageContext));

hub.MessageProduceStarted.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerStarted(eventContext.MessageContext));
hub.MessageProduceStarted.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerStarted(eventContext.MessageContext, kafkaFlowInstrumentationOptions));

hub.MessageProduceError.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerError(eventContext.MessageContext, eventContext.Exception));

Expand All @@ -31,4 +37,14 @@ public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IK

return builder;
}

/// <summary>
/// Adds OpenTelemetry instrumentation
/// </summary>
/// <param name="builder">The Kafka configuration builder</param>
/// <returns></returns>
public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder)
{
return AddOpenTelemetryInstrumentation(builder, null);
}
}
20 changes: 20 additions & 0 deletions src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentationOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using System.Diagnostics;

namespace KafkaFlow.OpenTelemetry;

/// <summary>
/// The options to be included in KafkaFlow instrumentation
/// </summary>
public class KafkaFlowInstrumentationOptions
{
/// <summary>
/// Gets or sets the Producer enricher
/// </summary>
public Action<Activity, IMessageContext> EnrichProducer { get; set; }

/// <summary>
/// Gets or sets the Consumer enricher
/// </summary>
public Action<Activity, IMessageContext> EnrichConsumer { get; set; }
}
15 changes: 11 additions & 4 deletions src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using global::OpenTelemetry;
using global::OpenTelemetry.Context.Propagation;
using global::OpenTelemetry.Trace;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace;

namespace KafkaFlow.OpenTelemetry;

Expand All @@ -17,7 +17,7 @@ internal static class OpenTelemetryConsumerEventsHandler
private const string AttributeMessagingKafkaSourcePartition = "messaging.kafka.source.partition";
private static readonly TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator;

public static Task OnConsumeStarted(IMessageContext context)
public static Task OnConsumeStarted(IMessageContext context, KafkaFlowInstrumentationOptions options)
{
try
{
Expand Down Expand Up @@ -45,6 +45,8 @@ public static Task OnConsumeStarted(IMessageContext context)
{
SetConsumerTags(context, activity);
}

options?.EnrichConsumer?.Invoke(activity, context);
}
catch
{
Expand Down Expand Up @@ -93,4 +95,9 @@ private static void SetConsumerTags(IMessageContext context, Activity activity)
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset);
activity.SetTag(AttributeMessagingKafkaSourcePartition, context.ConsumerContext.Partition);
}

internal static async Task OnConsumeStarted(IMessageContext messageContext, object v)

Check warning on line 99 in src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs

View workflow job for this annotation

GitHub Actions / Test deployment

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
{
throw new NotImplementedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using global::OpenTelemetry;
using global::OpenTelemetry.Context.Propagation;
using global::OpenTelemetry.Trace;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace;

namespace KafkaFlow.OpenTelemetry;

Expand All @@ -16,7 +16,7 @@ internal static class OpenTelemetryProducerEventsHandler
private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition";
private static readonly TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator;

public static Task OnProducerStarted(IMessageContext context)
public static Task OnProducerStarted(IMessageContext context, KafkaFlowInstrumentationOptions options)
{
try
{
Expand Down Expand Up @@ -55,6 +55,8 @@ public static Task OnProducerStarted(IMessageContext context)
{
SetProducerTags(context, activity);
}

options?.EnrichProducer?.Invoke(activity, context);
}
catch
{
Expand Down
108 changes: 99 additions & 9 deletions tests/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using AutoFixture;
using global::OpenTelemetry;
using global::OpenTelemetry.Trace;
using OpenTelemetry;
using OpenTelemetry.Trace;
using KafkaFlow.Compressor.Gzip;
using KafkaFlow.Configuration;
using KafkaFlow.IntegrationTests.Core;
Expand All @@ -31,10 +32,13 @@ public class OpenTelemetryTests

private bool _isPartitionAssigned;

private string _topicName;

[TestInitialize]
public void Setup()
{
_exportedItems = new List<Activity>();
_topicName = $"OpenTelemetryTestTopic_{Guid.NewGuid()}";
}

[TestMethod]
Expand Down Expand Up @@ -64,6 +68,92 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
}

[TestMethod]
public async Task AddOpenTelemetry_WithEnrichProducer_DefinedHeaderIsIncludedInActivity()
{
// Arrange
string tagName = "otel.header.tag.name";
string headerKey = "otel-header-key";
string headerValue = "otel-header-value";
var provider = await this.GetServiceProvider(options => options.EnrichProducer = (activity, messageContext) =>
{
var header = messageContext.Headers.FirstOrDefault(x => x.Key == headerKey);
var valueString = Encoding.UTF8.GetString(header.Value);
activity.SetTag(tagName, valueString);
});

MessageStorage.Clear();

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(KafkaFlowInstrumentation.ActivitySourceName)
.AddInMemoryExporter(_exportedItems)
.Build();

var producer = provider.GetRequiredService<IMessageProducer<GzipProducer>>();
var message = _fixture.Create<byte[]>();
IMessageHeaders headers = new MessageHeaders
{
{ headerKey, Encoding.UTF8.GetBytes(headerValue) }
};

// Act
await producer.ProduceAsync(_topicName, "key", message, headers);

// Assert
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

var otelTagValueProducer = producerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value;
var otelTagValueConsumer = consumerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value;

Assert.IsNotNull(_exportedItems);
Assert.IsNull(otelTagValueConsumer);
Assert.IsNotNull(otelTagValueProducer);
Assert.AreEqual(otelTagValueProducer, headerValue);
}

[TestMethod]
public async Task AddOpenTelemetry_WithEnrichConsumer_DefinedHeaderIsIncludedInActivity()
{
// Arrange
string tagName = "otel-header-tag-name";
string headerKey = "otel-header-key";
string headerValue = "otel-header-value";
var provider = await this.GetServiceProvider(options => options.EnrichConsumer = (activity, messageContext) =>
{
var header = messageContext.Headers.FirstOrDefault(x => x.Key == headerKey);
var valueString = Encoding.UTF8.GetString(header.Value);
activity.SetTag(tagName, valueString);
});

MessageStorage.Clear();

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(KafkaFlowInstrumentation.ActivitySourceName)
.AddInMemoryExporter(_exportedItems)
.Build();

var producer = provider.GetRequiredService<IMessageProducer<GzipProducer>>();
var message = _fixture.Create<byte[]>();
IMessageHeaders headers = new MessageHeaders
{
{ headerKey, Encoding.UTF8.GetBytes(headerValue) }
};

// Act
await producer.ProduceAsync(_topicName, "key", message, headers);

// Assert
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

var otelTagValueProducer = producerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value;
var otelTagValueConsumer = consumerSpan.Tags.FirstOrDefault(x => x.Key == tagName).Value;

Assert.IsNotNull(_exportedItems);
Assert.IsNull(otelTagValueProducer);
Assert.IsNotNull(otelTagValueConsumer);
Assert.AreEqual(otelTagValueConsumer, headerValue);
}

[TestMethod]
public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity()
{
Expand Down Expand Up @@ -137,9 +227,9 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp
Assert.AreEqual(consumerSpan.GetBaggageItem(baggageName2), baggageValue2);
}

private async Task<IServiceProvider> GetServiceProvider()
private async Task<IServiceProvider> GetServiceProvider(Action<KafkaFlowInstrumentationOptions> options = null)
{
var topicName = $"OpenTelemetryTestTopic_{Guid.NewGuid()}";


_isPartitionAssigned = false;

Expand All @@ -163,17 +253,17 @@ private async Task<IServiceProvider> GetServiceProvider()
.AddCluster(
cluster => cluster
.WithBrokers(context.Configuration.GetValue<string>("Kafka:Brokers").Split(';'))
.CreateTopicIfNotExists(topicName, 1, 1)
.CreateTopicIfNotExists(_topicName, 1, 1)
.AddProducer<GzipProducer>(
producer => producer
.DefaultTopic(topicName)
.DefaultTopic(_topicName)
.AddMiddlewares(
middlewares => middlewares
.AddCompressor<GzipMessageCompressor>()))
.AddConsumer(
consumer => consumer
.Topic(topicName)
.WithGroupId(topicName)
.Topic(_topicName)
.WithGroupId(_topicName)
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
Expand All @@ -185,7 +275,7 @@ private async Task<IServiceProvider> GetServiceProvider()
{
_isPartitionAssigned = true;
})))
.AddOpenTelemetryInstrumentation()))
.AddOpenTelemetryInstrumentation(options)))
.UseDefaultServiceProvider(
(_, options) =>
{
Expand Down

0 comments on commit 807518b

Please sign in to comment.