Skip to content

Commit

Permalink
PoC: Include event-based instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Sep 20, 2023
1 parent b1a91d0 commit 3f576a0
Show file tree
Hide file tree
Showing 19 changed files with 257 additions and 50 deletions.
4 changes: 3 additions & 1 deletion samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
Expand Down Expand Up @@ -32,6 +32,8 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.17" />
<PackageReference Include="OpenTelemetry" Version="1.6.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.6.0" />
</ItemGroup>

</Project>
11 changes: 10 additions & 1 deletion samples/KafkaFlow.Sample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using KafkaFlow;
using KafkaFlow.Configuration;
using KafkaFlow.OpenTelemetry.Trace;
using KafkaFlow.Producers;
using KafkaFlow.Sample;
using KafkaFlow.Serializer;
using KafkaFlow.TypedHandler;
using Microsoft.Extensions.DependencyInjection;
using OpenTelemetry;
using OpenTelemetry.Trace;

var services = new ServiceCollection();

Expand Down Expand Up @@ -38,8 +42,8 @@
.AddTypedHandlers(h => h.AddHandler<PrintConsoleHandler>())
)
)
.AddOpenTelemetryInstrumentation()
)
.AddOpenTelemetryInstrumentation()
);

var provider = services.BuildServiceProvider();
Expand All @@ -54,6 +58,11 @@

Console.WriteLine("Type the number of messages to produce or 'exit' to quit:");

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("KafkaFlow")
.AddConsoleExporter()
.Build();

while (true)
{
var input = Console.ReadLine();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow.Configuration
{
using System;
using KafkaFlow.Events;

/// <summary>
/// A builder to configure KafkaFlow
Expand All @@ -21,5 +22,7 @@ public interface IKafkaConfigurationBuilder
/// <returns></returns>
IKafkaConfigurationBuilder UseLogHandler<TLogHandler>()
where TLogHandler : ILogHandler;

IKafkaConfigurationBuilder SubscribeEvents(Action<IEventsListener> eventsListener);

Check warning on line 26 in src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IKafkaConfigurationBuilder.SubscribeEvents(Action<IEventsListener>)'
}
}
14 changes: 14 additions & 0 deletions src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;

namespace KafkaFlow.Events.Args
{
public class ConsumeErrorEventArgs

Check warning on line 5 in src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'ConsumeErrorEventArgs'
{
public ConsumeErrorEventArgs(Exception exception)

Check warning on line 7 in src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'ConsumeErrorEventArgs.ConsumeErrorEventArgs(Exception)'
{
this.Exception = exception;
}

public Exception Exception { get; set; }
}
}
12 changes: 12 additions & 0 deletions src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace KafkaFlow.Events.Args
{
public class ConsumeStartEventArgs
{
internal ConsumeStartEventArgs(IMessageContext context)
{
this.MessageContext = context;
}

public IMessageContext MessageContext { get; set; }
}
}
14 changes: 14 additions & 0 deletions src/KafkaFlow.Abstractions/Events/Args/ProduceErrorEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;

namespace KafkaFlow.Events.Args
{
public class ProduceErrorEventArgs
{
public ProduceErrorEventArgs(Exception exception)
{
this.Exception = exception;
}

public Exception Exception { get; set; }
}
}
12 changes: 12 additions & 0 deletions src/KafkaFlow.Abstractions/Events/Args/ProduceStartEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace KafkaFlow.Events.Args
{
public class ProduceStartEventArgs
{
internal ProduceStartEventArgs(IMessageContext context)
{
this.MessageContext = context;
}

public IMessageContext MessageContext { get; set; }
}
}
36 changes: 36 additions & 0 deletions src/KafkaFlow.Abstractions/Events/EventsManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;
using KafkaFlow.Events.Args;

namespace KafkaFlow.Events
{
public class EventsManager : IEventsListener, IEventsNotifier
{
public event EventHandler<ConsumeStartEventArgs> OnConsumeStart;

public event EventHandler<ProduceStartEventArgs> OnProduceStart;

public event EventHandler<ConsumeErrorEventArgs> OnConsumeError;

public event EventHandler<ProduceErrorEventArgs> OnProduceError;

public void NotifyOnConsumeError(Exception exception)
{
this.OnConsumeError?.Invoke(this, new ConsumeErrorEventArgs(exception));
}

public void NotifyOnConsumeStart(IMessageContext context)
{
this.OnConsumeStart?.Invoke(this, new ConsumeStartEventArgs(context));
}

public void NotifyOnProduceError(Exception exception)
{
this.OnProduceError?.Invoke(this, new ProduceErrorEventArgs(exception));
}

public void NotifyOnProduceStart(IMessageContext context)
{
this.OnProduceStart?.Invoke(this, new ProduceStartEventArgs(context));
}
}
}
18 changes: 18 additions & 0 deletions src/KafkaFlow.Abstractions/Events/IEventsListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Text;
using KafkaFlow.Events.Args;

namespace KafkaFlow.Events
{
public interface IEventsListener
{
event EventHandler<ConsumeStartEventArgs> OnConsumeStart;

event EventHandler<ProduceStartEventArgs> OnProduceStart;

event EventHandler<ConsumeErrorEventArgs> OnConsumeError;

event EventHandler<ProduceErrorEventArgs> OnProduceError;
}
}
18 changes: 18 additions & 0 deletions src/KafkaFlow.Abstractions/Events/IEventsNotifier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;

namespace KafkaFlow.Events
{
// Isto devia ser internal visto que a notificação de events não deveria ser publica, apenas a subscrição desses eventos

// Falta o OnConsumeEnd e OnProduceEnd
public interface IEventsNotifier
{
void NotifyOnConsumeError(Exception exception);

void NotifyOnConsumeStart(IMessageContext context);

void NotifyOnProduceError(Exception exception);

void NotifyOnProduceStart(IMessageContext context);
}
}
12 changes: 12 additions & 0 deletions src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"profiles": {
"KafkaFlow.Admin.Dashboard": {
"commandName": "Project",
"launchBrowser": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "https://localhost:57596;http://localhost:57597"
}
}
}
17 changes: 15 additions & 2 deletions src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,20 @@ public static class ExtensionMethods
/// </summary>
/// <param name="builder">The Kafka configuration builder</param>
/// <returns></returns>
public static IClusterConfigurationBuilder AddOpenTelemetryInstrumentation(this IClusterConfigurationBuilder builder) =>
builder.AddInstrumentation<TracerConsumerMiddleware, TracerProducerMiddleware>();
public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder)
{
var tracerConsumerMiddleware = new TracerConsumerMiddleware();
var tracerProducerMiddleware = new TracerProducerMiddleware();

builder.SubscribeEvents(events =>
{
events.OnConsumeError += (sender, args) => tracerConsumerMiddleware.UpdateActivityOnError(args.Exception);

Check notice on line 22 in src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs#L22

'sender' is not used. Use discard parameter instead.
events.OnConsumeStart += (sender, args) => tracerConsumerMiddleware.CreateActivityOnConsume(args.MessageContext);

Check notice on line 23 in src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs#L23

'sender' is not used. Use discard parameter instead.
events.OnProduceError += (sender, args) => tracerProducerMiddleware.UpdateActivityOnError(args.Exception);

Check notice on line 24 in src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs#L24

'sender' is not used. Use discard parameter instead.
events.OnProduceStart += (sender, args) => tracerProducerMiddleware.CreateActivityOnProduce(args.MessageContext);

Check notice on line 25 in src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs#L25

'sender' is not used. Use discard parameter instead.
});

return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ namespace KafkaFlow.OpenTelemetry.Trace
using System.Reflection;
using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions;

internal static class KafkaFlowActivitySourceHelper
public static class KafkaFlowActivitySourceHelper
{
public static readonly ActivitySource ActivitySource = new ActivitySource(KafkaFlowString, Version.ToString());

internal static readonly IEnumerable<KeyValuePair<string, object>> CreationTags = new[]

Check warning on line 13 in src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs#L13

Move this field's initializer into a static constructor.
{
new KeyValuePair<string, object>(Conventions.AttributePeerService, KafkaString),
Expand All @@ -23,6 +21,8 @@ internal static class KafkaFlowActivitySourceHelper

private static readonly Version Version = Assembly.GetExecutingAssembly().GetName().Version;

public static readonly ActivitySource ActivitySource = new ActivitySource(KafkaFlowString, Version.ToString());

public static void SetGenericTags(Activity activity)
{
activity?.SetTag("messaging.system", KafkaString);
Expand Down
47 changes: 28 additions & 19 deletions src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,53 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using global::OpenTelemetry;
using global::OpenTelemetry.Context.Propagation;

internal class TracerConsumerMiddleware : IConsumerInstrumentationMiddleware
internal class TracerConsumerMiddleware
{
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
private static readonly string ProcessString = "process";

Check notice on line 13 in src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs#L13

Replace this 'static readonly' declaration with 'const'.

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
public void CreateActivityOnConsume(IMessageContext context)
{
var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context.ConsumerContext.Topic} {ProcessString}" : ProcessString;
try
{
var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context.ConsumerContext.Topic} {ProcessString}" : ProcessString;

Check warning on line 19 in src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs#L19

'context' is null on at least one execution path.

// Extract the PropagationContext of the upstream parent from the message headers.
var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, this.ExtractTraceContextIntoBasicProperties);
Baggage.Current = parentContext.Baggage;
// Extract the PropagationContext of the upstream parent from the message headers.
var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, this.ExtractTraceContextIntoBasicProperties);
Baggage.Current = parentContext.Baggage;

// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
using var activity = KafkaFlowActivitySourceHelper.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext);
// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
using var activity = KafkaFlowActivitySourceHelper.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext);

try
{
KafkaFlowActivitySourceHelper.SetGenericTags(activity);

if (activity != null && activity.IsAllDataRequested)
{
this.SetConsumerTags(context, activity);
}

await next.Invoke(context);
}
catch (Exception ex)
catch
{
activity?.SetTag("exception.message", ex.Message);
// ignored
}
}

throw;
public void UpdateActivityOnError(Exception ex)
{
try
{
var activity = Activity.Current;

activity?.SetTag("exception.message", ex.Message);
}
catch
{
// ignored
}
}

Expand All @@ -63,7 +72,7 @@ private void SetConsumerTags(IMessageContext context, Activity activity)
activity.SetTag("messaging.source.name", context.ConsumerContext.Topic);
activity.SetTag("messaging.kafka.consumer.group", context.ConsumerContext.GroupId);
activity.SetTag("messaging.kafka.message.key", context.Message.Key);
activity.SetTag("messaging.kafka.message.offset", context.ProducerContext.Offset);
activity.SetTag("messaging.kafka.message.offset", context.ConsumerContext.Offset);
activity.SetTag("messaging.kafka.source.partition", context.ConsumerContext.Partition);
}
}
Expand Down
Loading

0 comments on commit 3f576a0

Please sign in to comment.