diff --git a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj index 843c6fea2..423818eb9 100644 --- a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj +++ b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj @@ -1,4 +1,4 @@ - + Exe @@ -22,6 +22,7 @@ + @@ -31,6 +32,8 @@ + + diff --git a/samples/KafkaFlow.Sample/Program.cs b/samples/KafkaFlow.Sample/Program.cs index 874962de1..eefcbecf5 100644 --- a/samples/KafkaFlow.Sample/Program.cs +++ b/samples/KafkaFlow.Sample/Program.cs @@ -1,11 +1,16 @@ using System; +using System.Diagnostics; using System.Threading.Tasks; using KafkaFlow; +using KafkaFlow.Configuration; +using KafkaFlow.OpenTelemetry.Trace; using KafkaFlow.Producers; using KafkaFlow.Sample; using KafkaFlow.Serializer; using KafkaFlow.TypedHandler; using Microsoft.Extensions.DependencyInjection; +using OpenTelemetry; +using OpenTelemetry.Trace; var services = new ServiceCollection(); @@ -38,6 +43,7 @@ ) ) ) + .AddOpenTelemetryInstrumentation() ); var provider = services.BuildServiceProvider(); @@ -52,6 +58,11 @@ Console.WriteLine("Type the number of messages to produce or 'exit' to quit:"); +using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource("KafkaFlow") + .AddConsoleExporter() + .Build(); + while (true) { var input = Console.ReadLine(); diff --git a/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs index a29a6cfdb..01842900d 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs @@ -1,6 +1,7 @@ namespace KafkaFlow.Configuration { using System; + using KafkaFlow.Events; /// /// A builder to configure KafkaFlow @@ -21,5 +22,7 @@ public interface IKafkaConfigurationBuilder /// IKafkaConfigurationBuilder UseLogHandler() where TLogHandler : ILogHandler; + + IKafkaConfigurationBuilder SubscribeEvents(Action eventsListener); } } diff --git a/src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs b/src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs new file mode 100644 index 000000000..df12c6c22 --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs @@ -0,0 +1,14 @@ +using System; + +namespace KafkaFlow.Events.Args +{ + public class ConsumeErrorEventArgs + { + public ConsumeErrorEventArgs(Exception exception) + { + this.Exception = exception; + } + + public Exception Exception { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs b/src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs new file mode 100644 index 000000000..5a9cb593c --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs @@ -0,0 +1,12 @@ +namespace KafkaFlow.Events.Args +{ + public class ConsumeStartEventArgs + { + internal ConsumeStartEventArgs(IMessageContext context) + { + this.MessageContext = context; + } + + public IMessageContext MessageContext { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/Events/Args/ProduceErrorEventArgs.cs b/src/KafkaFlow.Abstractions/Events/Args/ProduceErrorEventArgs.cs new file mode 100644 index 000000000..45e9d041c --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/Args/ProduceErrorEventArgs.cs @@ -0,0 +1,14 @@ +using System; + +namespace KafkaFlow.Events.Args +{ + public class ProduceErrorEventArgs + { + public ProduceErrorEventArgs(Exception exception) + { + this.Exception = exception; + } + + public Exception Exception { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/Events/Args/ProduceStartEventArgs.cs b/src/KafkaFlow.Abstractions/Events/Args/ProduceStartEventArgs.cs new file mode 100644 index 000000000..3e67e9dcf --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/Args/ProduceStartEventArgs.cs @@ -0,0 +1,12 @@ +namespace KafkaFlow.Events.Args +{ + public class ProduceStartEventArgs + { + internal ProduceStartEventArgs(IMessageContext context) + { + this.MessageContext = context; + } + + public IMessageContext MessageContext { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/Events/EventsManager.cs b/src/KafkaFlow.Abstractions/Events/EventsManager.cs new file mode 100644 index 000000000..c32e6fab3 --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/EventsManager.cs @@ -0,0 +1,36 @@ +using System; +using KafkaFlow.Events.Args; + +namespace KafkaFlow.Events +{ + public class EventsManager : IEventsListener, IEventsNotifier + { + public event EventHandler OnConsumeStart; + + public event EventHandler OnProduceStart; + + public event EventHandler OnConsumeError; + + public event EventHandler OnProduceError; + + public void NotifyOnConsumeError(Exception exception) + { + this.OnConsumeError?.Invoke(this, new ConsumeErrorEventArgs(exception)); + } + + public void NotifyOnConsumeStart(IMessageContext context) + { + this.OnConsumeStart?.Invoke(this, new ConsumeStartEventArgs(context)); + } + + public void NotifyOnProduceError(Exception exception) + { + this.OnProduceError?.Invoke(this, new ProduceErrorEventArgs(exception)); + } + + public void NotifyOnProduceStart(IMessageContext context) + { + this.OnProduceStart?.Invoke(this, new ProduceStartEventArgs(context)); + } + } +} diff --git a/src/KafkaFlow.Abstractions/Events/IEventsListener.cs b/src/KafkaFlow.Abstractions/Events/IEventsListener.cs new file mode 100644 index 000000000..91aa9a75d --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/IEventsListener.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using KafkaFlow.Events.Args; + +namespace KafkaFlow.Events +{ + public interface IEventsListener + { + event EventHandler OnConsumeStart; + + event EventHandler OnProduceStart; + + event EventHandler OnConsumeError; + + event EventHandler OnProduceError; + } +} diff --git a/src/KafkaFlow.Abstractions/Events/IEventsNotifier.cs b/src/KafkaFlow.Abstractions/Events/IEventsNotifier.cs new file mode 100644 index 000000000..e0dddd4fe --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/IEventsNotifier.cs @@ -0,0 +1,18 @@ +using System; + +namespace KafkaFlow.Events +{ + // Isto devia ser internal visto que a notificação de events não deveria ser publica, apenas a subscrição desses eventos + + // Falta o OnConsumeEnd e OnProduceEnd + public interface IEventsNotifier + { + void NotifyOnConsumeError(Exception exception); + + void NotifyOnConsumeStart(IMessageContext context); + + void NotifyOnProduceError(Exception exception); + + void NotifyOnProduceStart(IMessageContext context); + } +} diff --git a/src/KafkaFlow.Abstractions/IConsumerContext.cs b/src/KafkaFlow.Abstractions/IConsumerContext.cs index 76733d147..3348779ed 100644 --- a/src/KafkaFlow.Abstractions/IConsumerContext.cs +++ b/src/KafkaFlow.Abstractions/IConsumerContext.cs @@ -1,6 +1,7 @@ namespace KafkaFlow { using System; + using System.Collections.Generic; using System.Threading; /// @@ -73,5 +74,8 @@ public interface IConsumerContext /// Resume Kafka's message fetch /// void Resume(); + + // TODO: Need to discuss + IReadOnlyCollection Brokers { get; } } } diff --git a/src/KafkaFlow.Admin.Dashboard/KafkaFlow.Admin.Dashboard.csproj b/src/KafkaFlow.Admin.Dashboard/KafkaFlow.Admin.Dashboard.csproj index 6a9338a13..1561f20ef 100644 --- a/src/KafkaFlow.Admin.Dashboard/KafkaFlow.Admin.Dashboard.csproj +++ b/src/KafkaFlow.Admin.Dashboard/KafkaFlow.Admin.Dashboard.csproj @@ -39,5 +39,5 @@ - + diff --git a/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs b/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs new file mode 100644 index 000000000..f232b084b --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs @@ -0,0 +1,31 @@ +namespace KafkaFlow.Configuration +{ + using KafkaFlow.OpenTelemetry.Trace; + + /// + /// Adds OpenTelemetry instrumentation + /// + public static class ExtensionMethods + { + /// + /// Adds OpenTelemetry instrumentation + /// + /// The Kafka configuration builder + /// + public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder) + { + var tracerConsumerMiddleware = new TracerConsumerMiddleware(); + var tracerProducerMiddleware = new TracerProducerMiddleware(); + + builder.SubscribeEvents(events => + { + events.OnConsumeError += (sender, args) => tracerConsumerMiddleware.UpdateActivityOnError(args.Exception); + events.OnConsumeStart += (sender, args) => tracerConsumerMiddleware.CreateActivityOnConsume(args.MessageContext); + events.OnProduceError += (sender, args) => tracerProducerMiddleware.UpdateActivityOnError(args.Exception); + events.OnProduceStart += (sender, args) => tracerProducerMiddleware.CreateActivityOnProduce(args.MessageContext); + }); + + return builder; + } + } +} diff --git a/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj b/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj new file mode 100644 index 000000000..2e1edea5e --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj @@ -0,0 +1,19 @@ + + + + netstandard2.0 + + + + + + + SemanticConventions + + + + + + + + diff --git a/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs b/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs new file mode 100644 index 000000000..97358d8bb --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs @@ -0,0 +1,33 @@ +extern alias SemanticConventions; + +namespace KafkaFlow.OpenTelemetry.Trace +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Reflection; + using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions; + + public static class KafkaFlowActivitySourceHelper + { + internal static readonly IEnumerable> CreationTags = new[] + { + new KeyValuePair(Conventions.AttributePeerService, KafkaString), + new KeyValuePair(Conventions.AttributeMessagingSystem, KafkaString), + }; + + internal static readonly string KafkaFlowString = "KafkaFlow"; + internal static readonly string KafkaString = "kafka"; + + private static readonly Version Version = Assembly.GetExecutingAssembly().GetName().Version; + + public static readonly ActivitySource ActivitySource = new ActivitySource(KafkaFlowString, Version.ToString()); + + public static void SetGenericTags(Activity activity) + { + activity?.SetTag("messaging.system", KafkaString); + // TODO: Broker information below. Set values after + activity?.SetTag("peer.service", KafkaString); + } + } +} diff --git a/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs b/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs new file mode 100644 index 000000000..f92ee5f3b --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs @@ -0,0 +1,79 @@ +namespace KafkaFlow.OpenTelemetry.Trace +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Text; + using global::OpenTelemetry; + using global::OpenTelemetry.Context.Propagation; + + internal class TracerConsumerMiddleware + { + private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; + private static readonly string ProcessString = "process"; + + public void CreateActivityOnConsume(IMessageContext context) + { + try + { + var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context.ConsumerContext.Topic} {ProcessString}" : ProcessString; + + // Extract the PropagationContext of the upstream parent from the message headers. + var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, this.ExtractTraceContextIntoBasicProperties); + Baggage.Current = parentContext.Baggage; + + // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. + // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md + using var activity = KafkaFlowActivitySourceHelper.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext); + + KafkaFlowActivitySourceHelper.SetGenericTags(activity); + + if (activity != null && activity.IsAllDataRequested) + { + this.SetConsumerTags(context, activity); + } + } + catch + { + // ignored + } + } + + public void UpdateActivityOnError(Exception ex) + { + try + { + var activity = Activity.Current; + + activity?.SetTag("exception.message", ex.Message); + } + catch + { + // ignored + } + } + + private IEnumerable ExtractTraceContextIntoBasicProperties(IMessageContext context, string key) + { + try + { + return new[] { context.Headers.GetString(key, Encoding.UTF8) }; + } + catch + { + return new string[] { }; + } + } + + private void SetConsumerTags(IMessageContext context, Activity activity) + { + activity.SetTag("messaging.operation", ProcessString); + activity.SetTag("messaging.source.name", context.ConsumerContext.Topic); + activity.SetTag("messaging.kafka.consumer.group", context.ConsumerContext.GroupId); + activity.SetTag("messaging.kafka.message.key", context.Message.Key); + activity.SetTag("messaging.kafka.message.offset", context.ConsumerContext.Offset); + activity.SetTag("messaging.kafka.source.partition", context.ConsumerContext.Partition); + } + } +} diff --git a/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs b/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs new file mode 100644 index 000000000..99ff81384 --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs @@ -0,0 +1,98 @@ +namespace KafkaFlow.OpenTelemetry.Trace +{ + using System; + using System.Diagnostics; + using System.Linq; + using System.Text; + using System.Threading.Tasks; + using global::OpenTelemetry; + using global::OpenTelemetry.Context.Propagation; + using static System.Net.Mime.MediaTypeNames; + + internal class TracerProducerMiddleware + { + private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; + private static readonly string PublishString = "publish"; + + public void CreateActivityOnProduce(IMessageContext context) + { + try + { + var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context.ProducerContext.Topic} {PublishString}" : PublishString; + + // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. + // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md + using var activity = KafkaFlowActivitySourceHelper.ActivitySource.StartActivity(activityName, ActivityKind.Producer); + + // Depending on Sampling (and whether a listener is registered or not), the + // activity above may not be created. + // If it is created, then propagate its context. + // If it is not created, the propagate the Current context, if any. + ActivityContext contextToInject = default; + + if (activity != null) + { + contextToInject = activity.Context; + } + else if (Activity.Current != null) + { + contextToInject = Activity.Current.Context; + } + + // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. + Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), context, this.InjectTraceContextIntoBasicProperties); + + KafkaFlowActivitySourceHelper.SetGenericTags(activity); + + if (activity != null && activity.IsAllDataRequested) + { + this.SetProducerTags(context, activity); + } + } + catch + { + // do nothing + } + } + + public void UpdateActivityOnError(Exception ex) + { + try + { + var activity = Activity.Current; + + activity?.SetTag("exception.message", ex.Message); + } + catch + { + // ignored + } + } + + private void InjectTraceContextIntoBasicProperties(IMessageContext context, string key, string value) + { + try + { + if (!context.Headers.Any(x => x.Key == key)) + { + Console.WriteLine("Injecting"); + context.Headers.Add(key, Encoding.ASCII.GetBytes(value)); + } + } + catch (Exception ex) + { + Console.WriteLine($"{ex}. Failed to inject trace context."); + } + } + + private void SetProducerTags(IMessageContext context, Activity activity) + { + activity.SetTag("messaging.operation", PublishString); + activity.SetTag("messaging.destination.name", context?.ProducerContext.Topic); + activity.SetTag("messaging.kafka.destination.partition", context.ProducerContext.Partition); + activity.SetTag("messaging.kafka.message.key", context.Message.Key); + activity.SetTag("messaging.kafka.message.offset", context.ProducerContext.Offset); + } + } +} diff --git a/src/KafkaFlow.sln b/src/KafkaFlow.sln index a3012d169..1b7689739 100644 --- a/src/KafkaFlow.sln +++ b/src/KafkaFlow.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.29709.97 +# Visual Studio Version 17 +VisualStudioVersion = 17.6.33717.318 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow", "KafkaFlow\KafkaFlow.csproj", "{E1055352-9F5B-4980-80A3-50C335B79A16}" EndProject @@ -73,11 +73,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Serializer.Schema EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Admin.Dashboard", "KafkaFlow.Admin.Dashboard\KafkaFlow.Admin.Dashboard.csproj", "{4072F646-9393-4BF3-A479-0550AC1BB6C4}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.Dashboard", "..\samples\KafkaFlow.Sample.Dashboard\KafkaFlow.Sample.Dashboard.csproj", "{F32DC7DA-36EA-4199-91F5-81960FD9C650}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.Dashboard", "..\samples\KafkaFlow.Sample.Dashboard\KafkaFlow.Sample.Dashboard.csproj", "{F32DC7DA-36EA-4199-91F5-81960FD9C650}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.SchemaRegistry", "..\samples\KafkaFlow.Sample.SchemaRegistry\KafkaFlow.Sample.SchemaRegistry.csproj", "{2BD49C06-7A88-4B98-91B0-659282D2A45E}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.SchemaRegistry", "..\samples\KafkaFlow.Sample.SchemaRegistry\KafkaFlow.Sample.SchemaRegistry.csproj", "{2BD49C06-7A88-4B98-91B0-659282D2A45E}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.FlowControl", "..\samples\KafkaFlow.Sample.FlowControl\KafkaFlow.Sample.FlowControl.csproj", "{7B61C99E-3AEB-4497-8A38-F780CB309130}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.FlowControl", "..\samples\KafkaFlow.Sample.FlowControl\KafkaFlow.Sample.FlowControl.csproj", "{7B61C99E-3AEB-4497-8A38-F780CB309130}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Deploy", "Deploy", "{4A6A390C-A63A-4371-86BB-28481AD6D4C0}" ProjectSection(SolutionItems) = preProject @@ -85,11 +85,13 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Deploy", "Deploy", "{4A6A39 ..\.github\workflows\publish.yml = ..\.github\workflows\publish.yml EndProjectSection EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.LogHandler.Microsoft", "KafkaFlow.LogHandler.Microsoft\KafkaFlow.LogHandler.Microsoft.csproj", "{8EAF0D96-F760-4FEF-9237-92779F66482D}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.LogHandler.Microsoft", "KafkaFlow.LogHandler.Microsoft\KafkaFlow.LogHandler.Microsoft.csproj", "{8EAF0D96-F760-4FEF-9237-92779F66482D}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.PauseConsumerOnError", "..\samples\KafkaFlow.Sample.PauseConsumerOnError\KafkaFlow.Sample.PauseConsumerOnError.csproj", "{B4A9E7CE-7A37-411E-967E-D9B5FD1A3992}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.PauseConsumerOnError", "..\samples\KafkaFlow.Sample.PauseConsumerOnError\KafkaFlow.Sample.PauseConsumerOnError.csproj", "{B4A9E7CE-7A37-411E-967E-D9B5FD1A3992}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.ConsumerThrottling", "..\samples\KafkaFlow.Sample.ConsumerThrottling\KafkaFlow.Sample.ConsumerThrottling.csproj", "{4A16F519-FAF8-432C-AD0A-CC44F7BD392D}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.ConsumerThrottling", "..\samples\KafkaFlow.Sample.ConsumerThrottling\KafkaFlow.Sample.ConsumerThrottling.csproj", "{4A16F519-FAF8-432C-AD0A-CC44F7BD392D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.OpenTelemetry", "KafkaFlow.OpenTelemetry\KafkaFlow.OpenTelemetry.csproj", "{17040462-7CCC-40C0-A07C-40685F34B984}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -225,6 +227,10 @@ Global {4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Debug|Any CPU.Build.0 = Debug|Any CPU {4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Release|Any CPU.ActiveCfg = Release|Any CPU {4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Release|Any CPU.Build.0 = Release|Any CPU + {17040462-7CCC-40C0-A07C-40685F34B984}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {17040462-7CCC-40C0-A07C-40685F34B984}.Debug|Any CPU.Build.0 = Debug|Any CPU + {17040462-7CCC-40C0-A07C-40685F34B984}.Release|Any CPU.ActiveCfg = Release|Any CPU + {17040462-7CCC-40C0-A07C-40685F34B984}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -264,6 +270,7 @@ Global {8EAF0D96-F760-4FEF-9237-92779F66482D} = {EF626895-FDAE-4B28-9110-BA85671CBBF2} {B4A9E7CE-7A37-411E-967E-D9B5FD1A3992} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} {4A16F519-FAF8-432C-AD0A-CC44F7BD392D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} + {17040462-7CCC-40C0-A07C-40685F34B984} = {ED24B548-6F37-4283-A35B-F6015BFB7A34} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB} diff --git a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs index ac8c34585..c297c89f0 100644 --- a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs @@ -15,6 +15,8 @@ internal class ClusterConfigurationBuilder : IClusterConfigurationBuilder private IEnumerable brokers; private string name; private Func securityInformationHandler; + private Type instrumentationConsumerMiddleware = typeof(IMessageMiddleware); + private Type instrumentationProducerMiddleware = typeof(IMessageMiddleware); public ClusterConfigurationBuilder(IDependencyConfigurator dependencyConfigurator) { diff --git a/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs b/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs index 56e72d129..6752cd64f 100644 --- a/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs @@ -5,17 +5,21 @@ namespace KafkaFlow.Configuration using System.Linq; using KafkaFlow.Clusters; using KafkaFlow.Consumers; + using KafkaFlow.Events; using KafkaFlow.Producers; internal class KafkaConfigurationBuilder : IKafkaConfigurationBuilder { private readonly IDependencyConfigurator dependencyConfigurator; + private readonly List clusters = new(); private Type logHandlerType = typeof(NullLogHandler); + private readonly EventsManager eventsManager; public KafkaConfigurationBuilder(IDependencyConfigurator dependencyConfigurator) { this.dependencyConfigurator = dependencyConfigurator; + this.eventsManager = new EventsManager(); } public KafkaConfiguration Build() @@ -44,7 +48,9 @@ public KafkaConfiguration Build() .AddSingleton() .AddSingleton(new ConsumerAccessor()) .AddSingleton(new ConsumerManagerFactory()) - .AddSingleton(); + .AddSingleton() + .AddSingleton(this.eventsManager) + .AddSingleton(this.eventsManager); return configuration; } @@ -66,5 +72,11 @@ public IKafkaConfigurationBuilder UseLogHandler() this.logHandlerType = typeof(TLogHandler); return this; } + + public IKafkaConfigurationBuilder SubscribeEvents(Action eventsListener) + { + eventsListener?.Invoke(this.eventsManager); + return this; + } } } diff --git a/src/KafkaFlow/Consumers/ConsumerContext.cs b/src/KafkaFlow/Consumers/ConsumerContext.cs index 282ee2eb5..5ccbe5662 100644 --- a/src/KafkaFlow/Consumers/ConsumerContext.cs +++ b/src/KafkaFlow/Consumers/ConsumerContext.cs @@ -1,6 +1,7 @@ namespace KafkaFlow.Consumers { using System; + using System.Collections.Generic; using System.Threading; using Confluent.Kafka; @@ -38,6 +39,9 @@ public ConsumerContext( public string GroupId => this.consumer.Configuration.GroupId; + // TODO: Need to discuss + public IReadOnlyCollection Brokers => this.consumer.Configuration.ClusterConfiguration.Brokers; + public bool ShouldStoreOffset { get; set; } = true; public DateTime MessageTimestamp => this.kafkaResult.Message.Timestamp.UtcDateTime; diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index bb789af1e..9ed51957d 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -5,6 +5,7 @@ namespace KafkaFlow.Consumers using System.Threading.Channels; using System.Threading.Tasks; using Confluent.Kafka; + using KafkaFlow.Events; internal class ConsumerWorker : IConsumerWorker { @@ -13,6 +14,7 @@ internal class ConsumerWorker : IConsumerWorker private readonly IOffsetManager offsetManager; private readonly IMiddlewareExecutor middlewareExecutor; private readonly ILogHandler logHandler; + private readonly IEventsNotifier eventsNotifier; private readonly Channel> messagesBuffer; @@ -35,6 +37,9 @@ public ConsumerWorker( this.middlewareExecutor = middlewareExecutor; this.logHandler = logHandler; this.messagesBuffer = Channel.CreateBounded>(consumer.Configuration.BufferSize); + + var scope = this.dependencyResolver.CreateScope(); + this.eventsNotifier = scope.Resolver.Resolve(); } public int Id { get; } @@ -117,6 +122,8 @@ private async Task ProcessMessageAsync(ConsumeResult message, Ca this.Id), null); + this.eventsNotifier?.NotifyOnConsumeStart(context); + try { var scope = this.dependencyResolver.CreateScope(); @@ -135,6 +142,8 @@ await this.middlewareExecutor } catch (Exception ex) { + this.eventsNotifier?.NotifyOnConsumeError(ex); + this.logHandler.Error( "Error processing message", ex, diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index b5d0c8059..9a8dd6835 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -5,12 +5,14 @@ namespace KafkaFlow.Producers using System.Threading.Tasks; using Confluent.Kafka; using KafkaFlow.Configuration; + using KafkaFlow.Events; internal class MessageProducer : IMessageProducer, IDisposable { private readonly IDependencyResolver dependencyResolver; private readonly IProducerConfiguration configuration; private readonly MiddlewareExecutor middlewareExecutor; + private readonly IEventsNotifier eventsNotifier; private readonly object producerCreationSync = new(); @@ -23,6 +25,9 @@ public MessageProducer( this.dependencyResolver = dependencyResolver; this.configuration = configuration; this.middlewareExecutor = new MiddlewareExecutor(configuration.MiddlewaresConfigurations); + + var scope = this.dependencyResolver.CreateScope(); + this.eventsNotifier = scope.Resolver.Resolve(); } public string ProducerName => this.configuration.Name; @@ -269,6 +274,8 @@ private async Task> InternalProduceAsync(IMessage try { + this.eventsNotifier?.NotifyOnProduceStart(context); + var produceTask = partition.HasValue ? localProducer.ProduceAsync(new TopicPartition(context.ProducerContext.Topic, partition.Value), message) : localProducer.ProduceAsync(context.ProducerContext.Topic, message); @@ -282,6 +289,8 @@ private async Task> InternalProduceAsync(IMessage this.InvalidateProducer(e.Error, result); } + this.eventsNotifier?.NotifyOnProduceError(e); + throw; }