diff --git a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj index d8c72aa1f..423818eb9 100644 --- a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj +++ b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj @@ -1,4 +1,4 @@ - + Exe @@ -32,6 +32,8 @@ + + diff --git a/samples/KafkaFlow.Sample/Program.cs b/samples/KafkaFlow.Sample/Program.cs index 1b1ca240a..eefcbecf5 100644 --- a/samples/KafkaFlow.Sample/Program.cs +++ b/samples/KafkaFlow.Sample/Program.cs @@ -1,12 +1,16 @@ using System; +using System.Diagnostics; using System.Threading.Tasks; using KafkaFlow; using KafkaFlow.Configuration; +using KafkaFlow.OpenTelemetry.Trace; using KafkaFlow.Producers; using KafkaFlow.Sample; using KafkaFlow.Serializer; using KafkaFlow.TypedHandler; using Microsoft.Extensions.DependencyInjection; +using OpenTelemetry; +using OpenTelemetry.Trace; var services = new ServiceCollection(); @@ -38,8 +42,8 @@ .AddTypedHandlers(h => h.AddHandler()) ) ) - .AddOpenTelemetryInstrumentation() ) + .AddOpenTelemetryInstrumentation() ); var provider = services.BuildServiceProvider(); @@ -54,6 +58,11 @@ Console.WriteLine("Type the number of messages to produce or 'exit' to quit:"); +using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource("KafkaFlow") + .AddConsoleExporter() + .Build(); + while (true) { var input = Console.ReadLine(); diff --git a/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs index a29a6cfdb..01842900d 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs @@ -1,6 +1,7 @@ namespace KafkaFlow.Configuration { using System; + using KafkaFlow.Events; /// /// A builder to configure KafkaFlow @@ -21,5 +22,7 @@ public interface IKafkaConfigurationBuilder /// IKafkaConfigurationBuilder UseLogHandler() where TLogHandler : ILogHandler; + + IKafkaConfigurationBuilder SubscribeEvents(Action eventsListener); } } diff --git a/src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs b/src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs new file mode 100644 index 000000000..df12c6c22 --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs @@ -0,0 +1,14 @@ +using System; + +namespace KafkaFlow.Events.Args +{ + public class ConsumeErrorEventArgs + { + public ConsumeErrorEventArgs(Exception exception) + { + this.Exception = exception; + } + + public Exception Exception { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs b/src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs new file mode 100644 index 000000000..5a9cb593c --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs @@ -0,0 +1,12 @@ +namespace KafkaFlow.Events.Args +{ + public class ConsumeStartEventArgs + { + internal ConsumeStartEventArgs(IMessageContext context) + { + this.MessageContext = context; + } + + public IMessageContext MessageContext { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/Events/Args/ProduceErrorEventArgs.cs b/src/KafkaFlow.Abstractions/Events/Args/ProduceErrorEventArgs.cs new file mode 100644 index 000000000..45e9d041c --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/Args/ProduceErrorEventArgs.cs @@ -0,0 +1,14 @@ +using System; + +namespace KafkaFlow.Events.Args +{ + public class ProduceErrorEventArgs + { + public ProduceErrorEventArgs(Exception exception) + { + this.Exception = exception; + } + + public Exception Exception { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/Events/Args/ProduceStartEventArgs.cs b/src/KafkaFlow.Abstractions/Events/Args/ProduceStartEventArgs.cs new file mode 100644 index 000000000..3e67e9dcf --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/Args/ProduceStartEventArgs.cs @@ -0,0 +1,12 @@ +namespace KafkaFlow.Events.Args +{ + public class ProduceStartEventArgs + { + internal ProduceStartEventArgs(IMessageContext context) + { + this.MessageContext = context; + } + + public IMessageContext MessageContext { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/Events/EventsManager.cs b/src/KafkaFlow.Abstractions/Events/EventsManager.cs new file mode 100644 index 000000000..c32e6fab3 --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/EventsManager.cs @@ -0,0 +1,36 @@ +using System; +using KafkaFlow.Events.Args; + +namespace KafkaFlow.Events +{ + public class EventsManager : IEventsListener, IEventsNotifier + { + public event EventHandler OnConsumeStart; + + public event EventHandler OnProduceStart; + + public event EventHandler OnConsumeError; + + public event EventHandler OnProduceError; + + public void NotifyOnConsumeError(Exception exception) + { + this.OnConsumeError?.Invoke(this, new ConsumeErrorEventArgs(exception)); + } + + public void NotifyOnConsumeStart(IMessageContext context) + { + this.OnConsumeStart?.Invoke(this, new ConsumeStartEventArgs(context)); + } + + public void NotifyOnProduceError(Exception exception) + { + this.OnProduceError?.Invoke(this, new ProduceErrorEventArgs(exception)); + } + + public void NotifyOnProduceStart(IMessageContext context) + { + this.OnProduceStart?.Invoke(this, new ProduceStartEventArgs(context)); + } + } +} diff --git a/src/KafkaFlow.Abstractions/Events/IEventsListener.cs b/src/KafkaFlow.Abstractions/Events/IEventsListener.cs new file mode 100644 index 000000000..91aa9a75d --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/IEventsListener.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using KafkaFlow.Events.Args; + +namespace KafkaFlow.Events +{ + public interface IEventsListener + { + event EventHandler OnConsumeStart; + + event EventHandler OnProduceStart; + + event EventHandler OnConsumeError; + + event EventHandler OnProduceError; + } +} diff --git a/src/KafkaFlow.Abstractions/Events/IEventsNotifier.cs b/src/KafkaFlow.Abstractions/Events/IEventsNotifier.cs new file mode 100644 index 000000000..e0dddd4fe --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/IEventsNotifier.cs @@ -0,0 +1,18 @@ +using System; + +namespace KafkaFlow.Events +{ + // Isto devia ser internal visto que a notificação de events não deveria ser publica, apenas a subscrição desses eventos + + // Falta o OnConsumeEnd e OnProduceEnd + public interface IEventsNotifier + { + void NotifyOnConsumeError(Exception exception); + + void NotifyOnConsumeStart(IMessageContext context); + + void NotifyOnProduceError(Exception exception); + + void NotifyOnProduceStart(IMessageContext context); + } +} diff --git a/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json b/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json new file mode 100644 index 000000000..1b8239a87 --- /dev/null +++ b/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json @@ -0,0 +1,12 @@ +{ + "profiles": { + "KafkaFlow.Admin.Dashboard": { + "commandName": "Project", + "launchBrowser": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + }, + "applicationUrl": "https://localhost:57596;http://localhost:57597" + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs b/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs index 2f3e67677..f232b084b 100644 --- a/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs +++ b/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs @@ -12,7 +12,20 @@ public static class ExtensionMethods /// /// The Kafka configuration builder /// - public static IClusterConfigurationBuilder AddOpenTelemetryInstrumentation(this IClusterConfigurationBuilder builder) => - builder.AddInstrumentation(); + public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder) + { + var tracerConsumerMiddleware = new TracerConsumerMiddleware(); + var tracerProducerMiddleware = new TracerProducerMiddleware(); + + builder.SubscribeEvents(events => + { + events.OnConsumeError += (sender, args) => tracerConsumerMiddleware.UpdateActivityOnError(args.Exception); + events.OnConsumeStart += (sender, args) => tracerConsumerMiddleware.CreateActivityOnConsume(args.MessageContext); + events.OnProduceError += (sender, args) => tracerProducerMiddleware.UpdateActivityOnError(args.Exception); + events.OnProduceStart += (sender, args) => tracerProducerMiddleware.CreateActivityOnProduce(args.MessageContext); + }); + + return builder; + } } } diff --git a/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs b/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs index 8feb910a8..97358d8bb 100644 --- a/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs +++ b/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs @@ -8,10 +8,8 @@ namespace KafkaFlow.OpenTelemetry.Trace using System.Reflection; using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions; - internal static class KafkaFlowActivitySourceHelper + public static class KafkaFlowActivitySourceHelper { - public static readonly ActivitySource ActivitySource = new ActivitySource(KafkaFlowString, Version.ToString()); - internal static readonly IEnumerable> CreationTags = new[] { new KeyValuePair(Conventions.AttributePeerService, KafkaString), @@ -23,6 +21,8 @@ internal static class KafkaFlowActivitySourceHelper private static readonly Version Version = Assembly.GetExecutingAssembly().GetName().Version; + public static readonly ActivitySource ActivitySource = new ActivitySource(KafkaFlowString, Version.ToString()); + public static void SetGenericTags(Activity activity) { activity?.SetTag("messaging.system", KafkaString); diff --git a/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs b/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs index 0b1229cbd..f92ee5f3b 100644 --- a/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs +++ b/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs @@ -4,44 +4,53 @@ using System.Collections.Generic; using System.Diagnostics; using System.Text; - using System.Threading.Tasks; using global::OpenTelemetry; using global::OpenTelemetry.Context.Propagation; - internal class TracerConsumerMiddleware : IConsumerInstrumentationMiddleware + internal class TracerConsumerMiddleware { private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; private static readonly string ProcessString = "process"; - public async Task Invoke(IMessageContext context, MiddlewareDelegate next) + public void CreateActivityOnConsume(IMessageContext context) { - var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context.ConsumerContext.Topic} {ProcessString}" : ProcessString; + try + { + var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context.ConsumerContext.Topic} {ProcessString}" : ProcessString; - // Extract the PropagationContext of the upstream parent from the message headers. - var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, this.ExtractTraceContextIntoBasicProperties); - Baggage.Current = parentContext.Baggage; + // Extract the PropagationContext of the upstream parent from the message headers. + var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, this.ExtractTraceContextIntoBasicProperties); + Baggage.Current = parentContext.Baggage; - // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. - // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md - using var activity = KafkaFlowActivitySourceHelper.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext); + // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. + // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md + using var activity = KafkaFlowActivitySourceHelper.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext); - try - { KafkaFlowActivitySourceHelper.SetGenericTags(activity); if (activity != null && activity.IsAllDataRequested) { this.SetConsumerTags(context, activity); } - - await next.Invoke(context); } - catch (Exception ex) + catch { - activity?.SetTag("exception.message", ex.Message); + // ignored + } + } - throw; + public void UpdateActivityOnError(Exception ex) + { + try + { + var activity = Activity.Current; + + activity?.SetTag("exception.message", ex.Message); + } + catch + { + // ignored } } @@ -63,7 +72,7 @@ private void SetConsumerTags(IMessageContext context, Activity activity) activity.SetTag("messaging.source.name", context.ConsumerContext.Topic); activity.SetTag("messaging.kafka.consumer.group", context.ConsumerContext.GroupId); activity.SetTag("messaging.kafka.message.key", context.Message.Key); - activity.SetTag("messaging.kafka.message.offset", context.ProducerContext.Offset); + activity.SetTag("messaging.kafka.message.offset", context.ConsumerContext.Offset); activity.SetTag("messaging.kafka.source.partition", context.ConsumerContext.Partition); } } diff --git a/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs b/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs index 6616c13ed..99ff81384 100644 --- a/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs +++ b/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs @@ -7,23 +7,24 @@ using System.Threading.Tasks; using global::OpenTelemetry; using global::OpenTelemetry.Context.Propagation; + using static System.Net.Mime.MediaTypeNames; - internal class TracerProducerMiddleware : IProducerInstrumentationMiddleware + internal class TracerProducerMiddleware { private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; private static readonly string PublishString = "publish"; - public async Task Invoke(IMessageContext context, MiddlewareDelegate next) + public void CreateActivityOnProduce(IMessageContext context) { - var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context.ProducerContext.Topic} {PublishString}" : PublishString; - - // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. - // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md - using var activity = KafkaFlowActivitySourceHelper.ActivitySource.StartActivity(activityName, ActivityKind.Producer); - try { + var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context.ProducerContext.Topic} {PublishString}" : PublishString; + + // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. + // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md + using var activity = KafkaFlowActivitySourceHelper.ActivitySource.StartActivity(activityName, ActivityKind.Producer); + // Depending on Sampling (and whether a listener is registered or not), the // activity above may not be created. // If it is created, then propagate its context. @@ -48,14 +49,24 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next) { this.SetProducerTags(context, activity); } - - await next.Invoke(context); } - catch (Exception ex) + catch { - activity?.SetTag("exception.message", ex.Message); + // do nothing + } + } + + public void UpdateActivityOnError(Exception ex) + { + try + { + var activity = Activity.Current; - throw; + activity?.SetTag("exception.message", ex.Message); + } + catch + { + // ignored } } diff --git a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs index b32e521be..2ee0628fa 100644 --- a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs @@ -37,15 +37,7 @@ public ClusterConfiguration Build(KafkaConfiguration kafkaConfiguration) this.topicsToCreateIfNotExist); configuration.AddProducers(this.producers.Select(x => x.Build(configuration))); - configuration.AddConsumers(this.consumers.Select(x => - { - if (this.instrumentationConsumerMiddleware != null) - { - x.AddInstrumentation(); - } - - return x.Build(configuration); - })); + configuration.AddConsumers(this.consumers.Select(x => x.Build(configuration))); return configuration; } diff --git a/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs b/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs index 56e72d129..6752cd64f 100644 --- a/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs @@ -5,17 +5,21 @@ namespace KafkaFlow.Configuration using System.Linq; using KafkaFlow.Clusters; using KafkaFlow.Consumers; + using KafkaFlow.Events; using KafkaFlow.Producers; internal class KafkaConfigurationBuilder : IKafkaConfigurationBuilder { private readonly IDependencyConfigurator dependencyConfigurator; + private readonly List clusters = new(); private Type logHandlerType = typeof(NullLogHandler); + private readonly EventsManager eventsManager; public KafkaConfigurationBuilder(IDependencyConfigurator dependencyConfigurator) { this.dependencyConfigurator = dependencyConfigurator; + this.eventsManager = new EventsManager(); } public KafkaConfiguration Build() @@ -44,7 +48,9 @@ public KafkaConfiguration Build() .AddSingleton() .AddSingleton(new ConsumerAccessor()) .AddSingleton(new ConsumerManagerFactory()) - .AddSingleton(); + .AddSingleton() + .AddSingleton(this.eventsManager) + .AddSingleton(this.eventsManager); return configuration; } @@ -66,5 +72,11 @@ public IKafkaConfigurationBuilder UseLogHandler() this.logHandlerType = typeof(TLogHandler); return this; } + + public IKafkaConfigurationBuilder SubscribeEvents(Action eventsListener) + { + eventsListener?.Invoke(this.eventsManager); + return this; + } } } diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index bb789af1e..9ed51957d 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -5,6 +5,7 @@ namespace KafkaFlow.Consumers using System.Threading.Channels; using System.Threading.Tasks; using Confluent.Kafka; + using KafkaFlow.Events; internal class ConsumerWorker : IConsumerWorker { @@ -13,6 +14,7 @@ internal class ConsumerWorker : IConsumerWorker private readonly IOffsetManager offsetManager; private readonly IMiddlewareExecutor middlewareExecutor; private readonly ILogHandler logHandler; + private readonly IEventsNotifier eventsNotifier; private readonly Channel> messagesBuffer; @@ -35,6 +37,9 @@ public ConsumerWorker( this.middlewareExecutor = middlewareExecutor; this.logHandler = logHandler; this.messagesBuffer = Channel.CreateBounded>(consumer.Configuration.BufferSize); + + var scope = this.dependencyResolver.CreateScope(); + this.eventsNotifier = scope.Resolver.Resolve(); } public int Id { get; } @@ -117,6 +122,8 @@ private async Task ProcessMessageAsync(ConsumeResult message, Ca this.Id), null); + this.eventsNotifier?.NotifyOnConsumeStart(context); + try { var scope = this.dependencyResolver.CreateScope(); @@ -135,6 +142,8 @@ await this.middlewareExecutor } catch (Exception ex) { + this.eventsNotifier?.NotifyOnConsumeError(ex); + this.logHandler.Error( "Error processing message", ex, diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index b5d0c8059..9a8dd6835 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -5,12 +5,14 @@ namespace KafkaFlow.Producers using System.Threading.Tasks; using Confluent.Kafka; using KafkaFlow.Configuration; + using KafkaFlow.Events; internal class MessageProducer : IMessageProducer, IDisposable { private readonly IDependencyResolver dependencyResolver; private readonly IProducerConfiguration configuration; private readonly MiddlewareExecutor middlewareExecutor; + private readonly IEventsNotifier eventsNotifier; private readonly object producerCreationSync = new(); @@ -23,6 +25,9 @@ public MessageProducer( this.dependencyResolver = dependencyResolver; this.configuration = configuration; this.middlewareExecutor = new MiddlewareExecutor(configuration.MiddlewaresConfigurations); + + var scope = this.dependencyResolver.CreateScope(); + this.eventsNotifier = scope.Resolver.Resolve(); } public string ProducerName => this.configuration.Name; @@ -269,6 +274,8 @@ private async Task> InternalProduceAsync(IMessage try { + this.eventsNotifier?.NotifyOnProduceStart(context); + var produceTask = partition.HasValue ? localProducer.ProduceAsync(new TopicPartition(context.ProducerContext.Topic, partition.Value), message) : localProducer.ProduceAsync(context.ProducerContext.Topic, message); @@ -282,6 +289,8 @@ private async Task> InternalProduceAsync(IMessage this.InvalidateProducer(e.Error, result); } + this.eventsNotifier?.NotifyOnProduceError(e); + throw; }