From e0d3039f9c8cb581d0ebb997f0169bcc91f375e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Wed, 13 Sep 2023 17:07:32 +0100 Subject: [PATCH 1/6] feat: creates kafkaflow.opentelemetry project Adds tracer middlwares for Producer and Consumer --- .../KafkaFlow.OpenTelemetry.csproj | 19 ++++ .../Trace/KafkaFlowActivitySourceHelper.cs | 33 +++++++ .../Trace/TracerConsumerMiddleware.cs | 70 +++++++++++++++ .../Trace/TracerProducerMiddleware.cs | 87 +++++++++++++++++++ src/KafkaFlow.sln | 23 +++-- 5 files changed, 224 insertions(+), 8 deletions(-) create mode 100644 src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj create mode 100644 src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs create mode 100644 src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs create mode 100644 src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs diff --git a/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj b/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj new file mode 100644 index 000000000..0d9a52dbf --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj @@ -0,0 +1,19 @@ + + + + netstandard2.0 + + + + + + + SemanticConventions + + + + + + + + diff --git a/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs b/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs new file mode 100644 index 000000000..97f70071d --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs @@ -0,0 +1,33 @@ +extern alias SemanticConventions; + +namespace KafkaFlow.OpenTelemetry.Trace +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Reflection; + using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions; + + internal class KafkaFlowActivitySourceHelper + { + public static readonly ActivitySource ActivitySource = new ActivitySource(KafkaFlowString, Version.ToString()); + + internal static readonly IEnumerable> CreationTags = new[] + { + new KeyValuePair(Conventions.AttributePeerService, KafkaString), + new KeyValuePair(Conventions.AttributeMessagingSystem, KafkaString), + }; + + internal static readonly string KafkaFlowString = "KafkaFlow"; + internal static readonly string KafkaString = "kafka"; + + private static readonly Version Version = Assembly.GetExecutingAssembly().GetName().Version; + + public static void SetGenericTags(Activity activity) + { + activity?.SetTag("messaging.system", KafkaString); + // Broker information below. Set values after + activity?.SetTag("peer.service", KafkaString); + } + } +} diff --git a/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs b/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs new file mode 100644 index 000000000..e4991c89e --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs @@ -0,0 +1,70 @@ +namespace KafkaFlow.OpenTelemetry.Trace +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Text; + using System.Threading.Tasks; + using global::OpenTelemetry; + using global::OpenTelemetry.Context.Propagation; + + internal class TracerConsumerMiddleware : IMessageMiddleware + { + private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; + private static readonly string ReceiveString = "receive"; + + public async Task Invoke(IMessageContext context, MiddlewareDelegate next) + { + var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context.ConsumerContext.Topic} {ReceiveString}" : ReceiveString; + + // Extract the PropagationContext of the upstream parent from the message headers. + var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, this.ExtractTraceContextIntoBasicProperties); + Baggage.Current = parentContext.Baggage; + + // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. + // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md + using var activity = KafkaFlowActivitySourceHelper.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext); + + try + { + KafkaFlowActivitySourceHelper.SetGenericTags(activity); + + if (activity != null && activity.IsAllDataRequested) + { + this.SetConsumerTags(context, activity); + } + + await next.Invoke(context); + } + catch (Exception ex) + { + activity?.SetTag("exception.message", ex.Message); + + throw; + } + } + + private IEnumerable ExtractTraceContextIntoBasicProperties(IMessageContext context, string key) + { + try + { + return new[] { context.Headers.GetString(key, Encoding.UTF8) }; + } + catch + { + return new string[] { }; + } + } + + private void SetConsumerTags(IMessageContext context, Activity activity) + { + activity.SetTag("messaging.operation", ReceiveString); + activity.SetTag("messaging.source.name", context.ConsumerContext.Topic); + activity.SetTag("messaging.kafka.consumer.group", context.ConsumerContext.GroupId); + activity.SetTag("messaging.kafka.message.key", context.Message.Key); + activity.SetTag("messaging.kafka.message.offset", context.ProducerContext.Offset); + activity.SetTag("messaging.kafka.source.partition", context.ConsumerContext.Partition); + } + } +} diff --git a/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs b/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs new file mode 100644 index 000000000..fb6272181 --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs @@ -0,0 +1,87 @@ +namespace KafkaFlow.OpenTelemetry.Trace +{ + using System; + using System.Diagnostics; + using System.Linq; + using System.Text; + using System.Threading.Tasks; + using global::OpenTelemetry; + using global::OpenTelemetry.Context.Propagation; + + internal class TracerProducerMiddleware : IMessageMiddleware + { + private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; + private static readonly string PublishString = "publish"; + + public async Task Invoke(IMessageContext context, MiddlewareDelegate next) + { + var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context.ProducerContext.Topic} {PublishString}" : PublishString; + + // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. + // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md + using var activity = KafkaFlowActivitySourceHelper.ActivitySource.StartActivity(activityName, ActivityKind.Producer); + + try + { + // Depending on Sampling (and whether a listener is registered or not), the + // activity above may not be created. + // If it is created, then propagate its context. + // If it is not created, the propagate the Current context, if any. + ActivityContext contextToInject = default; + + if (activity != null) + { + contextToInject = activity.Context; + } + else if (Activity.Current != null) + { + contextToInject = Activity.Current.Context; + } + + // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. + Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), context, this.InjectTraceContextIntoBasicProperties); + + KafkaFlowActivitySourceHelper.SetGenericTags(activity); + + if (activity != null && activity.IsAllDataRequested) + { + this.SetProducerTags(context, activity); + } + + await next.Invoke(context); + } + catch (Exception ex) + { + activity?.SetTag("exception.message", ex.Message); + + throw; + } + } + + private void InjectTraceContextIntoBasicProperties(IMessageContext context, string key, string value) + { + try + { + if (!context.Headers.Any(x => x.Key == key)) + { + Console.WriteLine("Injecting"); + context.Headers.Add(key, Encoding.ASCII.GetBytes(value)); + } + } + catch (Exception ex) + { + Console.WriteLine($"{ex}. Failed to inject trace context."); + } + } + + private void SetProducerTags(IMessageContext context, Activity activity) + { + activity.SetTag("messaging.operation", PublishString); + activity.SetTag("messaging.destination.name", context?.ProducerContext.Topic); + activity.SetTag("messaging.kafka.destination.partition", context.ProducerContext.Partition); + activity.SetTag("messaging.kafka.message.key", context.Message.Key); + activity.SetTag("messaging.kafka.message.offset", context.ProducerContext.Offset); + } + } +} diff --git a/src/KafkaFlow.sln b/src/KafkaFlow.sln index a3012d169..1b7689739 100644 --- a/src/KafkaFlow.sln +++ b/src/KafkaFlow.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.29709.97 +# Visual Studio Version 17 +VisualStudioVersion = 17.6.33717.318 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow", "KafkaFlow\KafkaFlow.csproj", "{E1055352-9F5B-4980-80A3-50C335B79A16}" EndProject @@ -73,11 +73,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Serializer.Schema EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Admin.Dashboard", "KafkaFlow.Admin.Dashboard\KafkaFlow.Admin.Dashboard.csproj", "{4072F646-9393-4BF3-A479-0550AC1BB6C4}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.Dashboard", "..\samples\KafkaFlow.Sample.Dashboard\KafkaFlow.Sample.Dashboard.csproj", "{F32DC7DA-36EA-4199-91F5-81960FD9C650}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.Dashboard", "..\samples\KafkaFlow.Sample.Dashboard\KafkaFlow.Sample.Dashboard.csproj", "{F32DC7DA-36EA-4199-91F5-81960FD9C650}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.SchemaRegistry", "..\samples\KafkaFlow.Sample.SchemaRegistry\KafkaFlow.Sample.SchemaRegistry.csproj", "{2BD49C06-7A88-4B98-91B0-659282D2A45E}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.SchemaRegistry", "..\samples\KafkaFlow.Sample.SchemaRegistry\KafkaFlow.Sample.SchemaRegistry.csproj", "{2BD49C06-7A88-4B98-91B0-659282D2A45E}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.FlowControl", "..\samples\KafkaFlow.Sample.FlowControl\KafkaFlow.Sample.FlowControl.csproj", "{7B61C99E-3AEB-4497-8A38-F780CB309130}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.FlowControl", "..\samples\KafkaFlow.Sample.FlowControl\KafkaFlow.Sample.FlowControl.csproj", "{7B61C99E-3AEB-4497-8A38-F780CB309130}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Deploy", "Deploy", "{4A6A390C-A63A-4371-86BB-28481AD6D4C0}" ProjectSection(SolutionItems) = preProject @@ -85,11 +85,13 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Deploy", "Deploy", "{4A6A39 ..\.github\workflows\publish.yml = ..\.github\workflows\publish.yml EndProjectSection EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.LogHandler.Microsoft", "KafkaFlow.LogHandler.Microsoft\KafkaFlow.LogHandler.Microsoft.csproj", "{8EAF0D96-F760-4FEF-9237-92779F66482D}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.LogHandler.Microsoft", "KafkaFlow.LogHandler.Microsoft\KafkaFlow.LogHandler.Microsoft.csproj", "{8EAF0D96-F760-4FEF-9237-92779F66482D}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.PauseConsumerOnError", "..\samples\KafkaFlow.Sample.PauseConsumerOnError\KafkaFlow.Sample.PauseConsumerOnError.csproj", "{B4A9E7CE-7A37-411E-967E-D9B5FD1A3992}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.PauseConsumerOnError", "..\samples\KafkaFlow.Sample.PauseConsumerOnError\KafkaFlow.Sample.PauseConsumerOnError.csproj", "{B4A9E7CE-7A37-411E-967E-D9B5FD1A3992}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.ConsumerThrottling", "..\samples\KafkaFlow.Sample.ConsumerThrottling\KafkaFlow.Sample.ConsumerThrottling.csproj", "{4A16F519-FAF8-432C-AD0A-CC44F7BD392D}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.ConsumerThrottling", "..\samples\KafkaFlow.Sample.ConsumerThrottling\KafkaFlow.Sample.ConsumerThrottling.csproj", "{4A16F519-FAF8-432C-AD0A-CC44F7BD392D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.OpenTelemetry", "KafkaFlow.OpenTelemetry\KafkaFlow.OpenTelemetry.csproj", "{17040462-7CCC-40C0-A07C-40685F34B984}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -225,6 +227,10 @@ Global {4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Debug|Any CPU.Build.0 = Debug|Any CPU {4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Release|Any CPU.ActiveCfg = Release|Any CPU {4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Release|Any CPU.Build.0 = Release|Any CPU + {17040462-7CCC-40C0-A07C-40685F34B984}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {17040462-7CCC-40C0-A07C-40685F34B984}.Debug|Any CPU.Build.0 = Debug|Any CPU + {17040462-7CCC-40C0-A07C-40685F34B984}.Release|Any CPU.ActiveCfg = Release|Any CPU + {17040462-7CCC-40C0-A07C-40685F34B984}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -264,6 +270,7 @@ Global {8EAF0D96-F760-4FEF-9237-92779F66482D} = {EF626895-FDAE-4B28-9110-BA85671CBBF2} {B4A9E7CE-7A37-411E-967E-D9B5FD1A3992} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} {4A16F519-FAF8-432C-AD0A-CC44F7BD392D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} + {17040462-7CCC-40C0-A07C-40685F34B984} = {ED24B548-6F37-4283-A35B-F6015BFB7A34} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB} From 645adb62d0e6a0b0cb236d84266f8f98b6a19ec1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Thu, 14 Sep 2023 11:39:54 +0100 Subject: [PATCH 2/6] feat: change consumer operation name From receive to process as the middleware will be included at the beginning --- .../Trace/KafkaFlowActivitySourceHelper.cs | 2 +- .../Trace/TracerConsumerMiddleware.cs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs b/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs index 97f70071d..fa6956a5f 100644 --- a/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs +++ b/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs @@ -26,7 +26,7 @@ internal class KafkaFlowActivitySourceHelper public static void SetGenericTags(Activity activity) { activity?.SetTag("messaging.system", KafkaString); - // Broker information below. Set values after + // TODO: Broker information below. Set values after activity?.SetTag("peer.service", KafkaString); } } diff --git a/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs b/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs index e4991c89e..8c6baac4d 100644 --- a/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs +++ b/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs @@ -11,11 +11,11 @@ internal class TracerConsumerMiddleware : IMessageMiddleware { private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; - private static readonly string ReceiveString = "receive"; + private static readonly string ProcessString = "process"; public async Task Invoke(IMessageContext context, MiddlewareDelegate next) { - var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context.ConsumerContext.Topic} {ReceiveString}" : ReceiveString; + var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context.ConsumerContext.Topic} {ProcessString}" : ProcessString; // Extract the PropagationContext of the upstream parent from the message headers. var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, this.ExtractTraceContextIntoBasicProperties); @@ -59,7 +59,7 @@ private IEnumerable ExtractTraceContextIntoBasicProperties(IMessageConte private void SetConsumerTags(IMessageContext context, Activity activity) { - activity.SetTag("messaging.operation", ReceiveString); + activity.SetTag("messaging.operation", ProcessString); activity.SetTag("messaging.source.name", context.ConsumerContext.Topic); activity.SetTag("messaging.kafka.consumer.group", context.ConsumerContext.GroupId); activity.SetTag("messaging.kafka.message.key", context.Message.Key); From dfcbf405c007a02674ee57e2baa4526ced2500ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Tue, 19 Sep 2023 12:47:15 +0100 Subject: [PATCH 3/6] Include AddOpenTelemetryInstrumentation in builder --- .../KafkaFlow.Sample/KafkaFlow.Sample.csproj | 1 + samples/KafkaFlow.Sample/Program.cs | 2 ++ .../IClusterConfigurationBuilder.cs | 5 +++++ .../Configuration/IInstrumentationBuilder.cs | 11 ++++++++++ .../ExtensionMethods.cs | 18 +++++++++++++++ .../KafkaFlow.OpenTelemetry.csproj | 2 +- .../Trace/KafkaFlowActivitySourceHelper.cs | 2 +- .../ClusterConfigurationBuilder.cs | 22 ++++++++++++++++++- .../ConsumerConfigurationBuilder.cs | 6 +++++ 9 files changed, 66 insertions(+), 3 deletions(-) create mode 100644 src/KafkaFlow.Abstractions/Configuration/IInstrumentationBuilder.cs create mode 100644 src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs diff --git a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj index 843c6fea2..d8c72aa1f 100644 --- a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj +++ b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj @@ -22,6 +22,7 @@ + diff --git a/samples/KafkaFlow.Sample/Program.cs b/samples/KafkaFlow.Sample/Program.cs index 874962de1..1b1ca240a 100644 --- a/samples/KafkaFlow.Sample/Program.cs +++ b/samples/KafkaFlow.Sample/Program.cs @@ -1,6 +1,7 @@ using System; using System.Threading.Tasks; using KafkaFlow; +using KafkaFlow.Configuration; using KafkaFlow.Producers; using KafkaFlow.Sample; using KafkaFlow.Serializer; @@ -37,6 +38,7 @@ .AddTypedHandlers(h => h.AddHandler()) ) ) + .AddOpenTelemetryInstrumentation() ) ); diff --git a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs index 41325ef45..894307391 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs @@ -82,5 +82,10 @@ IClusterConfigurationBuilder CreateTopicIfNotExists( string topicName, int numberOfPartitions, short replicationFactor); + + // Gets the cluster configuration + IClusterConfigurationBuilder AddInstrumentation() + where TConsumerInstrumentationMiddleware : class, IMessageMiddleware + where TProducerInstrumentationMiddleware : class, IMessageMiddleware; } } diff --git a/src/KafkaFlow.Abstractions/Configuration/IInstrumentationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IInstrumentationBuilder.cs new file mode 100644 index 000000000..db023a8b1 --- /dev/null +++ b/src/KafkaFlow.Abstractions/Configuration/IInstrumentationBuilder.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace KafkaFlow.Configuration +{ + public interface IInstrumentationBuilder + { + void AddInstrumentation(); + } +} diff --git a/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs b/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs new file mode 100644 index 000000000..2f3e67677 --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs @@ -0,0 +1,18 @@ +namespace KafkaFlow.Configuration +{ + using KafkaFlow.OpenTelemetry.Trace; + + /// + /// Adds OpenTelemetry instrumentation + /// + public static class ExtensionMethods + { + /// + /// Adds OpenTelemetry instrumentation + /// + /// The Kafka configuration builder + /// + public static IClusterConfigurationBuilder AddOpenTelemetryInstrumentation(this IClusterConfigurationBuilder builder) => + builder.AddInstrumentation(); + } +} diff --git a/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj b/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj index 0d9a52dbf..2e1edea5e 100644 --- a/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj +++ b/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj @@ -6,7 +6,7 @@ - + SemanticConventions diff --git a/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs b/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs index fa6956a5f..8feb910a8 100644 --- a/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs +++ b/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs @@ -8,7 +8,7 @@ namespace KafkaFlow.OpenTelemetry.Trace using System.Reflection; using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions; - internal class KafkaFlowActivitySourceHelper + internal static class KafkaFlowActivitySourceHelper { public static readonly ActivitySource ActivitySource = new ActivitySource(KafkaFlowString, Version.ToString()); diff --git a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs index ac8c34585..b328afb31 100644 --- a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs @@ -15,6 +15,8 @@ internal class ClusterConfigurationBuilder : IClusterConfigurationBuilder private IEnumerable brokers; private string name; private Func securityInformationHandler; + private Type instrumentationConsumerMiddleware = typeof(IMessageMiddleware); + private Type instrumentationProducerMiddleware = typeof(IMessageMiddleware); public ClusterConfigurationBuilder(IDependencyConfigurator dependencyConfigurator) { @@ -35,7 +37,15 @@ public ClusterConfiguration Build(KafkaConfiguration kafkaConfiguration) this.topicsToCreateIfNotExist); configuration.AddProducers(this.producers.Select(x => x.Build(configuration))); - configuration.AddConsumers(this.consumers.Select(x => x.Build(configuration))); + configuration.AddConsumers(this.consumers.Select(x => + { + if(this.instrumentationConsumerMiddleware != null) + { + x.AddInstrumentation(); + } + + return x.Build(configuration); + })); return configuration; } @@ -96,6 +106,16 @@ public IClusterConfigurationBuilder AddConsumer(Action() + where TConsumerInstrumentationMiddleware : class, IMessageMiddleware + where TProducerInstrumentationMiddleware : class, IMessageMiddleware + { + + this.instrumentationConsumerMiddleware = typeof(TConsumerInstrumentationMiddleware); + + return this; + } + public IClusterConfigurationBuilder OnStopping(Action handler) { this.onStoppingHandler = handler; diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index f7e601ed0..9d51193be 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -229,6 +229,12 @@ public IConsumerConfigurationBuilder WithCustomFactory(ConsumerCustomFactory cus return this; } + public void AddInstrumentation() + where T : class, IMessageMiddleware + { + this.middlewareConfigurationBuilder.AddAtBeginning(); + } + public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) { var middlewareConfiguration = this.middlewareConfigurationBuilder.Build(); From 0bcaac226d34d151ff3ad5c3600abe5f8b0490b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Wed, 20 Sep 2023 10:53:00 +0100 Subject: [PATCH 4/6] Middlewares not fully implemented --- .../IClusterConfigurationBuilder.cs | 4 ++-- .../IConsumerContext.cs | 3 +++ .../IConsumerInstrumentationMiddleware.cs | 6 ++++++ .../IProducerInstrumentationMiddleware.cs | 6 ++++++ .../Trace/TracerConsumerMiddleware.cs | 2 +- .../Trace/TracerProducerMiddleware.cs | 2 +- .../ClusterConfigurationBuilder.cs | 14 ++++++++----- .../ConsumerConfigurationBuilder.cs | 4 ++-- .../MiddlewareConfigurationBuilder.cs | 20 +++++++++++++++++++ src/KafkaFlow/Consumers/ConsumerContext.cs | 3 +++ 10 files changed, 53 insertions(+), 11 deletions(-) create mode 100644 src/KafkaFlow.Abstractions/IConsumerInstrumentationMiddleware.cs create mode 100644 src/KafkaFlow.Abstractions/IProducerInstrumentationMiddleware.cs diff --git a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs index 894307391..5026dcc8a 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs @@ -85,7 +85,7 @@ IClusterConfigurationBuilder CreateTopicIfNotExists( // Gets the cluster configuration IClusterConfigurationBuilder AddInstrumentation() - where TConsumerInstrumentationMiddleware : class, IMessageMiddleware - where TProducerInstrumentationMiddleware : class, IMessageMiddleware; + where TConsumerInstrumentationMiddleware : class, IConsumerInstrumentationMiddleware + where TProducerInstrumentationMiddleware : class, IProducerInstrumentationMiddleware; } } diff --git a/src/KafkaFlow.Abstractions/IConsumerContext.cs b/src/KafkaFlow.Abstractions/IConsumerContext.cs index 76733d147..62b63e432 100644 --- a/src/KafkaFlow.Abstractions/IConsumerContext.cs +++ b/src/KafkaFlow.Abstractions/IConsumerContext.cs @@ -1,6 +1,7 @@ namespace KafkaFlow { using System; + using System.Collections.Generic; using System.Threading; /// @@ -73,5 +74,7 @@ public interface IConsumerContext /// Resume Kafka's message fetch /// void Resume(); + + IReadOnlyCollection Brokers { get; } } } diff --git a/src/KafkaFlow.Abstractions/IConsumerInstrumentationMiddleware.cs b/src/KafkaFlow.Abstractions/IConsumerInstrumentationMiddleware.cs new file mode 100644 index 000000000..1ea7cc8bd --- /dev/null +++ b/src/KafkaFlow.Abstractions/IConsumerInstrumentationMiddleware.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow +{ + public interface IConsumerInstrumentationMiddleware : IMessageMiddleware + { + } +} diff --git a/src/KafkaFlow.Abstractions/IProducerInstrumentationMiddleware.cs b/src/KafkaFlow.Abstractions/IProducerInstrumentationMiddleware.cs new file mode 100644 index 000000000..9eb162a23 --- /dev/null +++ b/src/KafkaFlow.Abstractions/IProducerInstrumentationMiddleware.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow +{ + public interface IProducerInstrumentationMiddleware : IMessageMiddleware + { + } +} diff --git a/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs b/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs index 8c6baac4d..0b1229cbd 100644 --- a/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs +++ b/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs @@ -8,7 +8,7 @@ using global::OpenTelemetry; using global::OpenTelemetry.Context.Propagation; - internal class TracerConsumerMiddleware : IMessageMiddleware + internal class TracerConsumerMiddleware : IConsumerInstrumentationMiddleware { private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; private static readonly string ProcessString = "process"; diff --git a/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs b/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs index fb6272181..6616c13ed 100644 --- a/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs +++ b/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs @@ -8,7 +8,7 @@ using global::OpenTelemetry; using global::OpenTelemetry.Context.Propagation; - internal class TracerProducerMiddleware : IMessageMiddleware + internal class TracerProducerMiddleware : IProducerInstrumentationMiddleware { private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; private static readonly string PublishString = "publish"; diff --git a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs index b328afb31..b32e521be 100644 --- a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs @@ -39,9 +39,9 @@ public ClusterConfiguration Build(KafkaConfiguration kafkaConfiguration) configuration.AddProducers(this.producers.Select(x => x.Build(configuration))); configuration.AddConsumers(this.consumers.Select(x => { - if(this.instrumentationConsumerMiddleware != null) + if (this.instrumentationConsumerMiddleware != null) { - x.AddInstrumentation(); + x.AddInstrumentation(); } return x.Build(configuration); @@ -107,11 +107,15 @@ public IClusterConfigurationBuilder AddConsumer(Action() - where TConsumerInstrumentationMiddleware : class, IMessageMiddleware - where TProducerInstrumentationMiddleware : class, IMessageMiddleware + where TConsumerInstrumentationMiddleware : class, IConsumerInstrumentationMiddleware + where TProducerInstrumentationMiddleware : class, IProducerInstrumentationMiddleware { - this.instrumentationConsumerMiddleware = typeof(TConsumerInstrumentationMiddleware); + this.instrumentationProducerMiddleware = typeof(IProducerInstrumentationMiddleware); + + //this.DependencyConfigurator + // .AddTransient(typeof(IConsumerInstrumentationMiddleware), this.instrumentationConsumerMiddleware) + // .AddTransient(typeof(IProducerInstrumentationMiddleware), this.instrumentationProducerMiddleware); return this; } diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index 9d51193be..ca2dc531e 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -229,10 +229,10 @@ public IConsumerConfigurationBuilder WithCustomFactory(ConsumerCustomFactory cus return this; } - public void AddInstrumentation() + public void AddInstrumentation(IConsumerInstrumentationMiddleware consumerInstrumentationMiddleware) where T : class, IMessageMiddleware { - this.middlewareConfigurationBuilder.AddAtBeginning(); + var type = typeof(IConsumerInstrumentationMiddleware); } public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) diff --git a/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs b/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs index 642f3641d..6ce82d57a 100644 --- a/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs @@ -44,6 +44,11 @@ public TBuilder AddAtBeginning(MiddlewareLifetime lifetime = MiddlewareLifeti return this.AddAt(0, lifetime); } + public TBuilder AddAtBeginning(Type messageMiddleware, MiddlewareLifetime lifetime = MiddlewareLifetime.ConsumerOrProducer) + { + return this.AddAt(messageMiddleware, 0, lifetime); + } + public IReadOnlyList Build() => this.middlewaresConfigurations; private static InstanceLifetime ParseLifetime(MiddlewareLifetime lifetime) @@ -92,5 +97,20 @@ private TBuilder AddAt( return this as TBuilder; } + + private TBuilder AddAt( + Type messageMiddleware, + int position, + MiddlewareLifetime lifetime = MiddlewareLifetime.ConsumerOrProducer) + { + + this.DependencyConfigurator.Add(typeof(IMessageMiddleware), messageMiddleware, ParseLifetime(lifetime)); + + this.middlewaresConfigurations.Insert( + position, + new MiddlewareConfiguration(messageMiddleware.GetType(), lifetime)); + + return this as TBuilder; + } } } diff --git a/src/KafkaFlow/Consumers/ConsumerContext.cs b/src/KafkaFlow/Consumers/ConsumerContext.cs index 282ee2eb5..267647e71 100644 --- a/src/KafkaFlow/Consumers/ConsumerContext.cs +++ b/src/KafkaFlow/Consumers/ConsumerContext.cs @@ -1,6 +1,7 @@ namespace KafkaFlow.Consumers { using System; + using System.Collections.Generic; using System.Threading; using Confluent.Kafka; @@ -38,6 +39,8 @@ public ConsumerContext( public string GroupId => this.consumer.Configuration.GroupId; + public IReadOnlyCollection Brokers => this.consumer.Configuration.ClusterConfiguration.Brokers; + public bool ShouldStoreOffset { get; set; } = true; public DateTime MessageTimestamp => this.kafkaResult.Message.Timestamp.UtcDateTime; From 96f87d4a14c2b6c575eb9974943777d3bd77376b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Wed, 20 Sep 2023 15:19:45 +0100 Subject: [PATCH 5/6] PoC: Include event-based instrumentation --- .../KafkaFlow.Sample/KafkaFlow.Sample.csproj | 4 +- samples/KafkaFlow.Sample/Program.cs | 11 ++++- .../IKafkaConfigurationBuilder.cs | 3 ++ .../Events/Args/ConsumeErrorEventArgs.cs | 14 ++++++ .../Events/Args/ConsumeStartEventArgs.cs | 12 +++++ .../Events/Args/ProduceErrorEventArgs.cs | 14 ++++++ .../Events/Args/ProduceStartEventArgs.cs | 12 +++++ .../Events/EventsManager.cs | 36 ++++++++++++++ .../Events/IEventsListener.cs | 18 +++++++ .../Events/IEventsNotifier.cs | 18 +++++++ .../Properties/launchSettings.json | 12 +++++ .../ExtensionMethods.cs | 17 ++++++- .../Trace/KafkaFlowActivitySourceHelper.cs | 6 +-- .../Trace/TracerConsumerMiddleware.cs | 47 +++++++++++-------- .../Trace/TracerProducerMiddleware.cs | 39 +++++++++------ .../ClusterConfigurationBuilder.cs | 10 +--- .../KafkaConfigurationBuilder.cs | 14 +++++- src/KafkaFlow/Consumers/ConsumerWorker.cs | 9 ++++ src/KafkaFlow/Producers/MessageProducer.cs | 9 ++++ 19 files changed, 255 insertions(+), 50 deletions(-) create mode 100644 src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs create mode 100644 src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs create mode 100644 src/KafkaFlow.Abstractions/Events/Args/ProduceErrorEventArgs.cs create mode 100644 src/KafkaFlow.Abstractions/Events/Args/ProduceStartEventArgs.cs create mode 100644 src/KafkaFlow.Abstractions/Events/EventsManager.cs create mode 100644 src/KafkaFlow.Abstractions/Events/IEventsListener.cs create mode 100644 src/KafkaFlow.Abstractions/Events/IEventsNotifier.cs create mode 100644 src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json diff --git a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj index d8c72aa1f..423818eb9 100644 --- a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj +++ b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj @@ -1,4 +1,4 @@ - + Exe @@ -32,6 +32,8 @@ + + diff --git a/samples/KafkaFlow.Sample/Program.cs b/samples/KafkaFlow.Sample/Program.cs index 1b1ca240a..eefcbecf5 100644 --- a/samples/KafkaFlow.Sample/Program.cs +++ b/samples/KafkaFlow.Sample/Program.cs @@ -1,12 +1,16 @@ using System; +using System.Diagnostics; using System.Threading.Tasks; using KafkaFlow; using KafkaFlow.Configuration; +using KafkaFlow.OpenTelemetry.Trace; using KafkaFlow.Producers; using KafkaFlow.Sample; using KafkaFlow.Serializer; using KafkaFlow.TypedHandler; using Microsoft.Extensions.DependencyInjection; +using OpenTelemetry; +using OpenTelemetry.Trace; var services = new ServiceCollection(); @@ -38,8 +42,8 @@ .AddTypedHandlers(h => h.AddHandler()) ) ) - .AddOpenTelemetryInstrumentation() ) + .AddOpenTelemetryInstrumentation() ); var provider = services.BuildServiceProvider(); @@ -54,6 +58,11 @@ Console.WriteLine("Type the number of messages to produce or 'exit' to quit:"); +using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource("KafkaFlow") + .AddConsoleExporter() + .Build(); + while (true) { var input = Console.ReadLine(); diff --git a/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs index a29a6cfdb..01842900d 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs @@ -1,6 +1,7 @@ namespace KafkaFlow.Configuration { using System; + using KafkaFlow.Events; /// /// A builder to configure KafkaFlow @@ -21,5 +22,7 @@ public interface IKafkaConfigurationBuilder /// IKafkaConfigurationBuilder UseLogHandler() where TLogHandler : ILogHandler; + + IKafkaConfigurationBuilder SubscribeEvents(Action eventsListener); } } diff --git a/src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs b/src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs new file mode 100644 index 000000000..df12c6c22 --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/Args/ConsumeErrorEventArgs.cs @@ -0,0 +1,14 @@ +using System; + +namespace KafkaFlow.Events.Args +{ + public class ConsumeErrorEventArgs + { + public ConsumeErrorEventArgs(Exception exception) + { + this.Exception = exception; + } + + public Exception Exception { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs b/src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs new file mode 100644 index 000000000..5a9cb593c --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/Args/ConsumeStartEventArgs.cs @@ -0,0 +1,12 @@ +namespace KafkaFlow.Events.Args +{ + public class ConsumeStartEventArgs + { + internal ConsumeStartEventArgs(IMessageContext context) + { + this.MessageContext = context; + } + + public IMessageContext MessageContext { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/Events/Args/ProduceErrorEventArgs.cs b/src/KafkaFlow.Abstractions/Events/Args/ProduceErrorEventArgs.cs new file mode 100644 index 000000000..45e9d041c --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/Args/ProduceErrorEventArgs.cs @@ -0,0 +1,14 @@ +using System; + +namespace KafkaFlow.Events.Args +{ + public class ProduceErrorEventArgs + { + public ProduceErrorEventArgs(Exception exception) + { + this.Exception = exception; + } + + public Exception Exception { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/Events/Args/ProduceStartEventArgs.cs b/src/KafkaFlow.Abstractions/Events/Args/ProduceStartEventArgs.cs new file mode 100644 index 000000000..3e67e9dcf --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/Args/ProduceStartEventArgs.cs @@ -0,0 +1,12 @@ +namespace KafkaFlow.Events.Args +{ + public class ProduceStartEventArgs + { + internal ProduceStartEventArgs(IMessageContext context) + { + this.MessageContext = context; + } + + public IMessageContext MessageContext { get; set; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/Events/EventsManager.cs b/src/KafkaFlow.Abstractions/Events/EventsManager.cs new file mode 100644 index 000000000..c32e6fab3 --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/EventsManager.cs @@ -0,0 +1,36 @@ +using System; +using KafkaFlow.Events.Args; + +namespace KafkaFlow.Events +{ + public class EventsManager : IEventsListener, IEventsNotifier + { + public event EventHandler OnConsumeStart; + + public event EventHandler OnProduceStart; + + public event EventHandler OnConsumeError; + + public event EventHandler OnProduceError; + + public void NotifyOnConsumeError(Exception exception) + { + this.OnConsumeError?.Invoke(this, new ConsumeErrorEventArgs(exception)); + } + + public void NotifyOnConsumeStart(IMessageContext context) + { + this.OnConsumeStart?.Invoke(this, new ConsumeStartEventArgs(context)); + } + + public void NotifyOnProduceError(Exception exception) + { + this.OnProduceError?.Invoke(this, new ProduceErrorEventArgs(exception)); + } + + public void NotifyOnProduceStart(IMessageContext context) + { + this.OnProduceStart?.Invoke(this, new ProduceStartEventArgs(context)); + } + } +} diff --git a/src/KafkaFlow.Abstractions/Events/IEventsListener.cs b/src/KafkaFlow.Abstractions/Events/IEventsListener.cs new file mode 100644 index 000000000..91aa9a75d --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/IEventsListener.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using KafkaFlow.Events.Args; + +namespace KafkaFlow.Events +{ + public interface IEventsListener + { + event EventHandler OnConsumeStart; + + event EventHandler OnProduceStart; + + event EventHandler OnConsumeError; + + event EventHandler OnProduceError; + } +} diff --git a/src/KafkaFlow.Abstractions/Events/IEventsNotifier.cs b/src/KafkaFlow.Abstractions/Events/IEventsNotifier.cs new file mode 100644 index 000000000..e0dddd4fe --- /dev/null +++ b/src/KafkaFlow.Abstractions/Events/IEventsNotifier.cs @@ -0,0 +1,18 @@ +using System; + +namespace KafkaFlow.Events +{ + // Isto devia ser internal visto que a notificação de events não deveria ser publica, apenas a subscrição desses eventos + + // Falta o OnConsumeEnd e OnProduceEnd + public interface IEventsNotifier + { + void NotifyOnConsumeError(Exception exception); + + void NotifyOnConsumeStart(IMessageContext context); + + void NotifyOnProduceError(Exception exception); + + void NotifyOnProduceStart(IMessageContext context); + } +} diff --git a/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json b/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json new file mode 100644 index 000000000..1b8239a87 --- /dev/null +++ b/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json @@ -0,0 +1,12 @@ +{ + "profiles": { + "KafkaFlow.Admin.Dashboard": { + "commandName": "Project", + "launchBrowser": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + }, + "applicationUrl": "https://localhost:57596;http://localhost:57597" + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs b/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs index 2f3e67677..f232b084b 100644 --- a/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs +++ b/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs @@ -12,7 +12,20 @@ public static class ExtensionMethods /// /// The Kafka configuration builder /// - public static IClusterConfigurationBuilder AddOpenTelemetryInstrumentation(this IClusterConfigurationBuilder builder) => - builder.AddInstrumentation(); + public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder) + { + var tracerConsumerMiddleware = new TracerConsumerMiddleware(); + var tracerProducerMiddleware = new TracerProducerMiddleware(); + + builder.SubscribeEvents(events => + { + events.OnConsumeError += (sender, args) => tracerConsumerMiddleware.UpdateActivityOnError(args.Exception); + events.OnConsumeStart += (sender, args) => tracerConsumerMiddleware.CreateActivityOnConsume(args.MessageContext); + events.OnProduceError += (sender, args) => tracerProducerMiddleware.UpdateActivityOnError(args.Exception); + events.OnProduceStart += (sender, args) => tracerProducerMiddleware.CreateActivityOnProduce(args.MessageContext); + }); + + return builder; + } } } diff --git a/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs b/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs index 8feb910a8..97358d8bb 100644 --- a/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs +++ b/src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs @@ -8,10 +8,8 @@ namespace KafkaFlow.OpenTelemetry.Trace using System.Reflection; using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions; - internal static class KafkaFlowActivitySourceHelper + public static class KafkaFlowActivitySourceHelper { - public static readonly ActivitySource ActivitySource = new ActivitySource(KafkaFlowString, Version.ToString()); - internal static readonly IEnumerable> CreationTags = new[] { new KeyValuePair(Conventions.AttributePeerService, KafkaString), @@ -23,6 +21,8 @@ internal static class KafkaFlowActivitySourceHelper private static readonly Version Version = Assembly.GetExecutingAssembly().GetName().Version; + public static readonly ActivitySource ActivitySource = new ActivitySource(KafkaFlowString, Version.ToString()); + public static void SetGenericTags(Activity activity) { activity?.SetTag("messaging.system", KafkaString); diff --git a/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs b/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs index 0b1229cbd..f92ee5f3b 100644 --- a/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs +++ b/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs @@ -4,44 +4,53 @@ using System.Collections.Generic; using System.Diagnostics; using System.Text; - using System.Threading.Tasks; using global::OpenTelemetry; using global::OpenTelemetry.Context.Propagation; - internal class TracerConsumerMiddleware : IConsumerInstrumentationMiddleware + internal class TracerConsumerMiddleware { private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; private static readonly string ProcessString = "process"; - public async Task Invoke(IMessageContext context, MiddlewareDelegate next) + public void CreateActivityOnConsume(IMessageContext context) { - var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context.ConsumerContext.Topic} {ProcessString}" : ProcessString; + try + { + var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context.ConsumerContext.Topic} {ProcessString}" : ProcessString; - // Extract the PropagationContext of the upstream parent from the message headers. - var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, this.ExtractTraceContextIntoBasicProperties); - Baggage.Current = parentContext.Baggage; + // Extract the PropagationContext of the upstream parent from the message headers. + var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, this.ExtractTraceContextIntoBasicProperties); + Baggage.Current = parentContext.Baggage; - // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. - // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md - using var activity = KafkaFlowActivitySourceHelper.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext); + // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. + // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md + using var activity = KafkaFlowActivitySourceHelper.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext); - try - { KafkaFlowActivitySourceHelper.SetGenericTags(activity); if (activity != null && activity.IsAllDataRequested) { this.SetConsumerTags(context, activity); } - - await next.Invoke(context); } - catch (Exception ex) + catch { - activity?.SetTag("exception.message", ex.Message); + // ignored + } + } - throw; + public void UpdateActivityOnError(Exception ex) + { + try + { + var activity = Activity.Current; + + activity?.SetTag("exception.message", ex.Message); + } + catch + { + // ignored } } @@ -63,7 +72,7 @@ private void SetConsumerTags(IMessageContext context, Activity activity) activity.SetTag("messaging.source.name", context.ConsumerContext.Topic); activity.SetTag("messaging.kafka.consumer.group", context.ConsumerContext.GroupId); activity.SetTag("messaging.kafka.message.key", context.Message.Key); - activity.SetTag("messaging.kafka.message.offset", context.ProducerContext.Offset); + activity.SetTag("messaging.kafka.message.offset", context.ConsumerContext.Offset); activity.SetTag("messaging.kafka.source.partition", context.ConsumerContext.Partition); } } diff --git a/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs b/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs index 6616c13ed..99ff81384 100644 --- a/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs +++ b/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs @@ -7,23 +7,24 @@ using System.Threading.Tasks; using global::OpenTelemetry; using global::OpenTelemetry.Context.Propagation; + using static System.Net.Mime.MediaTypeNames; - internal class TracerProducerMiddleware : IProducerInstrumentationMiddleware + internal class TracerProducerMiddleware { private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; private static readonly string PublishString = "publish"; - public async Task Invoke(IMessageContext context, MiddlewareDelegate next) + public void CreateActivityOnProduce(IMessageContext context) { - var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context.ProducerContext.Topic} {PublishString}" : PublishString; - - // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. - // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md - using var activity = KafkaFlowActivitySourceHelper.ActivitySource.StartActivity(activityName, ActivityKind.Producer); - try { + var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context.ProducerContext.Topic} {PublishString}" : PublishString; + + // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. + // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md + using var activity = KafkaFlowActivitySourceHelper.ActivitySource.StartActivity(activityName, ActivityKind.Producer); + // Depending on Sampling (and whether a listener is registered or not), the // activity above may not be created. // If it is created, then propagate its context. @@ -48,14 +49,24 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next) { this.SetProducerTags(context, activity); } - - await next.Invoke(context); } - catch (Exception ex) + catch { - activity?.SetTag("exception.message", ex.Message); + // do nothing + } + } + + public void UpdateActivityOnError(Exception ex) + { + try + { + var activity = Activity.Current; - throw; + activity?.SetTag("exception.message", ex.Message); + } + catch + { + // ignored } } diff --git a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs index b32e521be..2ee0628fa 100644 --- a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs @@ -37,15 +37,7 @@ public ClusterConfiguration Build(KafkaConfiguration kafkaConfiguration) this.topicsToCreateIfNotExist); configuration.AddProducers(this.producers.Select(x => x.Build(configuration))); - configuration.AddConsumers(this.consumers.Select(x => - { - if (this.instrumentationConsumerMiddleware != null) - { - x.AddInstrumentation(); - } - - return x.Build(configuration); - })); + configuration.AddConsumers(this.consumers.Select(x => x.Build(configuration))); return configuration; } diff --git a/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs b/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs index 56e72d129..6752cd64f 100644 --- a/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs @@ -5,17 +5,21 @@ namespace KafkaFlow.Configuration using System.Linq; using KafkaFlow.Clusters; using KafkaFlow.Consumers; + using KafkaFlow.Events; using KafkaFlow.Producers; internal class KafkaConfigurationBuilder : IKafkaConfigurationBuilder { private readonly IDependencyConfigurator dependencyConfigurator; + private readonly List clusters = new(); private Type logHandlerType = typeof(NullLogHandler); + private readonly EventsManager eventsManager; public KafkaConfigurationBuilder(IDependencyConfigurator dependencyConfigurator) { this.dependencyConfigurator = dependencyConfigurator; + this.eventsManager = new EventsManager(); } public KafkaConfiguration Build() @@ -44,7 +48,9 @@ public KafkaConfiguration Build() .AddSingleton() .AddSingleton(new ConsumerAccessor()) .AddSingleton(new ConsumerManagerFactory()) - .AddSingleton(); + .AddSingleton() + .AddSingleton(this.eventsManager) + .AddSingleton(this.eventsManager); return configuration; } @@ -66,5 +72,11 @@ public IKafkaConfigurationBuilder UseLogHandler() this.logHandlerType = typeof(TLogHandler); return this; } + + public IKafkaConfigurationBuilder SubscribeEvents(Action eventsListener) + { + eventsListener?.Invoke(this.eventsManager); + return this; + } } } diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index bb789af1e..9ed51957d 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -5,6 +5,7 @@ namespace KafkaFlow.Consumers using System.Threading.Channels; using System.Threading.Tasks; using Confluent.Kafka; + using KafkaFlow.Events; internal class ConsumerWorker : IConsumerWorker { @@ -13,6 +14,7 @@ internal class ConsumerWorker : IConsumerWorker private readonly IOffsetManager offsetManager; private readonly IMiddlewareExecutor middlewareExecutor; private readonly ILogHandler logHandler; + private readonly IEventsNotifier eventsNotifier; private readonly Channel> messagesBuffer; @@ -35,6 +37,9 @@ public ConsumerWorker( this.middlewareExecutor = middlewareExecutor; this.logHandler = logHandler; this.messagesBuffer = Channel.CreateBounded>(consumer.Configuration.BufferSize); + + var scope = this.dependencyResolver.CreateScope(); + this.eventsNotifier = scope.Resolver.Resolve(); } public int Id { get; } @@ -117,6 +122,8 @@ private async Task ProcessMessageAsync(ConsumeResult message, Ca this.Id), null); + this.eventsNotifier?.NotifyOnConsumeStart(context); + try { var scope = this.dependencyResolver.CreateScope(); @@ -135,6 +142,8 @@ await this.middlewareExecutor } catch (Exception ex) { + this.eventsNotifier?.NotifyOnConsumeError(ex); + this.logHandler.Error( "Error processing message", ex, diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index b5d0c8059..9a8dd6835 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -5,12 +5,14 @@ namespace KafkaFlow.Producers using System.Threading.Tasks; using Confluent.Kafka; using KafkaFlow.Configuration; + using KafkaFlow.Events; internal class MessageProducer : IMessageProducer, IDisposable { private readonly IDependencyResolver dependencyResolver; private readonly IProducerConfiguration configuration; private readonly MiddlewareExecutor middlewareExecutor; + private readonly IEventsNotifier eventsNotifier; private readonly object producerCreationSync = new(); @@ -23,6 +25,9 @@ public MessageProducer( this.dependencyResolver = dependencyResolver; this.configuration = configuration; this.middlewareExecutor = new MiddlewareExecutor(configuration.MiddlewaresConfigurations); + + var scope = this.dependencyResolver.CreateScope(); + this.eventsNotifier = scope.Resolver.Resolve(); } public string ProducerName => this.configuration.Name; @@ -269,6 +274,8 @@ private async Task> InternalProduceAsync(IMessage try { + this.eventsNotifier?.NotifyOnProduceStart(context); + var produceTask = partition.HasValue ? localProducer.ProduceAsync(new TopicPartition(context.ProducerContext.Topic, partition.Value), message) : localProducer.ProduceAsync(context.ProducerContext.Topic, message); @@ -282,6 +289,8 @@ private async Task> InternalProduceAsync(IMessage this.InvalidateProducer(e.Error, result); } + this.eventsNotifier?.NotifyOnProduceError(e); + throw; } From 283288bb62fd139104d20efdb970ae1c47c02d39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Wed, 20 Sep 2023 15:40:57 +0100 Subject: [PATCH 6/6] PoC: Remove older implementation --- .../IClusterConfigurationBuilder.cs | 5 ----- .../Configuration/IInstrumentationBuilder.cs | 11 ---------- .../IConsumerContext.cs | 1 + .../IConsumerInstrumentationMiddleware.cs | 6 ------ .../IProducerInstrumentationMiddleware.cs | 6 ------ .../KafkaFlow.Admin.Dashboard.csproj | 2 +- .../Properties/launchSettings.json | 12 ----------- .../ClusterConfigurationBuilder.cs | 14 ------------- .../ConsumerConfigurationBuilder.cs | 6 ------ .../MiddlewareConfigurationBuilder.cs | 20 ------------------- src/KafkaFlow/Consumers/ConsumerContext.cs | 1 + 11 files changed, 3 insertions(+), 81 deletions(-) delete mode 100644 src/KafkaFlow.Abstractions/Configuration/IInstrumentationBuilder.cs delete mode 100644 src/KafkaFlow.Abstractions/IConsumerInstrumentationMiddleware.cs delete mode 100644 src/KafkaFlow.Abstractions/IProducerInstrumentationMiddleware.cs delete mode 100644 src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json diff --git a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs index 5026dcc8a..41325ef45 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs @@ -82,10 +82,5 @@ IClusterConfigurationBuilder CreateTopicIfNotExists( string topicName, int numberOfPartitions, short replicationFactor); - - // Gets the cluster configuration - IClusterConfigurationBuilder AddInstrumentation() - where TConsumerInstrumentationMiddleware : class, IConsumerInstrumentationMiddleware - where TProducerInstrumentationMiddleware : class, IProducerInstrumentationMiddleware; } } diff --git a/src/KafkaFlow.Abstractions/Configuration/IInstrumentationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IInstrumentationBuilder.cs deleted file mode 100644 index db023a8b1..000000000 --- a/src/KafkaFlow.Abstractions/Configuration/IInstrumentationBuilder.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace KafkaFlow.Configuration -{ - public interface IInstrumentationBuilder - { - void AddInstrumentation(); - } -} diff --git a/src/KafkaFlow.Abstractions/IConsumerContext.cs b/src/KafkaFlow.Abstractions/IConsumerContext.cs index 62b63e432..3348779ed 100644 --- a/src/KafkaFlow.Abstractions/IConsumerContext.cs +++ b/src/KafkaFlow.Abstractions/IConsumerContext.cs @@ -75,6 +75,7 @@ public interface IConsumerContext /// void Resume(); + // TODO: Need to discuss IReadOnlyCollection Brokers { get; } } } diff --git a/src/KafkaFlow.Abstractions/IConsumerInstrumentationMiddleware.cs b/src/KafkaFlow.Abstractions/IConsumerInstrumentationMiddleware.cs deleted file mode 100644 index 1ea7cc8bd..000000000 --- a/src/KafkaFlow.Abstractions/IConsumerInstrumentationMiddleware.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace KafkaFlow -{ - public interface IConsumerInstrumentationMiddleware : IMessageMiddleware - { - } -} diff --git a/src/KafkaFlow.Abstractions/IProducerInstrumentationMiddleware.cs b/src/KafkaFlow.Abstractions/IProducerInstrumentationMiddleware.cs deleted file mode 100644 index 9eb162a23..000000000 --- a/src/KafkaFlow.Abstractions/IProducerInstrumentationMiddleware.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace KafkaFlow -{ - public interface IProducerInstrumentationMiddleware : IMessageMiddleware - { - } -} diff --git a/src/KafkaFlow.Admin.Dashboard/KafkaFlow.Admin.Dashboard.csproj b/src/KafkaFlow.Admin.Dashboard/KafkaFlow.Admin.Dashboard.csproj index 6a9338a13..1561f20ef 100644 --- a/src/KafkaFlow.Admin.Dashboard/KafkaFlow.Admin.Dashboard.csproj +++ b/src/KafkaFlow.Admin.Dashboard/KafkaFlow.Admin.Dashboard.csproj @@ -39,5 +39,5 @@ - + diff --git a/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json b/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json deleted file mode 100644 index 1b8239a87..000000000 --- a/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "profiles": { - "KafkaFlow.Admin.Dashboard": { - "commandName": "Project", - "launchBrowser": true, - "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" - }, - "applicationUrl": "https://localhost:57596;http://localhost:57597" - } - } -} \ No newline at end of file diff --git a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs index 2ee0628fa..c297c89f0 100644 --- a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs @@ -98,20 +98,6 @@ public IClusterConfigurationBuilder AddConsumer(Action() - where TConsumerInstrumentationMiddleware : class, IConsumerInstrumentationMiddleware - where TProducerInstrumentationMiddleware : class, IProducerInstrumentationMiddleware - { - this.instrumentationConsumerMiddleware = typeof(TConsumerInstrumentationMiddleware); - this.instrumentationProducerMiddleware = typeof(IProducerInstrumentationMiddleware); - - //this.DependencyConfigurator - // .AddTransient(typeof(IConsumerInstrumentationMiddleware), this.instrumentationConsumerMiddleware) - // .AddTransient(typeof(IProducerInstrumentationMiddleware), this.instrumentationProducerMiddleware); - - return this; - } - public IClusterConfigurationBuilder OnStopping(Action handler) { this.onStoppingHandler = handler; diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index ca2dc531e..f7e601ed0 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -229,12 +229,6 @@ public IConsumerConfigurationBuilder WithCustomFactory(ConsumerCustomFactory cus return this; } - public void AddInstrumentation(IConsumerInstrumentationMiddleware consumerInstrumentationMiddleware) - where T : class, IMessageMiddleware - { - var type = typeof(IConsumerInstrumentationMiddleware); - } - public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) { var middlewareConfiguration = this.middlewareConfigurationBuilder.Build(); diff --git a/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs b/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs index 6ce82d57a..642f3641d 100644 --- a/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs @@ -44,11 +44,6 @@ public TBuilder AddAtBeginning(MiddlewareLifetime lifetime = MiddlewareLifeti return this.AddAt(0, lifetime); } - public TBuilder AddAtBeginning(Type messageMiddleware, MiddlewareLifetime lifetime = MiddlewareLifetime.ConsumerOrProducer) - { - return this.AddAt(messageMiddleware, 0, lifetime); - } - public IReadOnlyList Build() => this.middlewaresConfigurations; private static InstanceLifetime ParseLifetime(MiddlewareLifetime lifetime) @@ -97,20 +92,5 @@ private TBuilder AddAt( return this as TBuilder; } - - private TBuilder AddAt( - Type messageMiddleware, - int position, - MiddlewareLifetime lifetime = MiddlewareLifetime.ConsumerOrProducer) - { - - this.DependencyConfigurator.Add(typeof(IMessageMiddleware), messageMiddleware, ParseLifetime(lifetime)); - - this.middlewaresConfigurations.Insert( - position, - new MiddlewareConfiguration(messageMiddleware.GetType(), lifetime)); - - return this as TBuilder; - } } } diff --git a/src/KafkaFlow/Consumers/ConsumerContext.cs b/src/KafkaFlow/Consumers/ConsumerContext.cs index 267647e71..5ccbe5662 100644 --- a/src/KafkaFlow/Consumers/ConsumerContext.cs +++ b/src/KafkaFlow/Consumers/ConsumerContext.cs @@ -39,6 +39,7 @@ public ConsumerContext( public string GroupId => this.consumer.Configuration.GroupId; + // TODO: Need to discuss public IReadOnlyCollection Brokers => this.consumer.Configuration.ClusterConfiguration.Brokers; public bool ShouldStoreOffset { get; set; } = true;