diff --git a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs index 894307391..5026dcc8a 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs @@ -85,7 +85,7 @@ IClusterConfigurationBuilder CreateTopicIfNotExists( // Gets the cluster configuration IClusterConfigurationBuilder AddInstrumentation() - where TConsumerInstrumentationMiddleware : class, IMessageMiddleware - where TProducerInstrumentationMiddleware : class, IMessageMiddleware; + where TConsumerInstrumentationMiddleware : class, IConsumerInstrumentationMiddleware + where TProducerInstrumentationMiddleware : class, IProducerInstrumentationMiddleware; } } diff --git a/src/KafkaFlow.Abstractions/IConsumerContext.cs b/src/KafkaFlow.Abstractions/IConsumerContext.cs index 76733d147..62b63e432 100644 --- a/src/KafkaFlow.Abstractions/IConsumerContext.cs +++ b/src/KafkaFlow.Abstractions/IConsumerContext.cs @@ -1,6 +1,7 @@ namespace KafkaFlow { using System; + using System.Collections.Generic; using System.Threading; /// @@ -73,5 +74,7 @@ public interface IConsumerContext /// Resume Kafka's message fetch /// void Resume(); + + IReadOnlyCollection Brokers { get; } } } diff --git a/src/KafkaFlow.Abstractions/IConsumerInstrumentationMiddleware.cs b/src/KafkaFlow.Abstractions/IConsumerInstrumentationMiddleware.cs new file mode 100644 index 000000000..1ea7cc8bd --- /dev/null +++ b/src/KafkaFlow.Abstractions/IConsumerInstrumentationMiddleware.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow +{ + public interface IConsumerInstrumentationMiddleware : IMessageMiddleware + { + } +} diff --git a/src/KafkaFlow.Abstractions/IProducerInstrumentationMiddleware.cs b/src/KafkaFlow.Abstractions/IProducerInstrumentationMiddleware.cs new file mode 100644 index 000000000..9eb162a23 --- /dev/null +++ b/src/KafkaFlow.Abstractions/IProducerInstrumentationMiddleware.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow +{ + public interface IProducerInstrumentationMiddleware : IMessageMiddleware + { + } +} diff --git a/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs b/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs index 8c6baac4d..0b1229cbd 100644 --- a/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs +++ b/src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs @@ -8,7 +8,7 @@ using global::OpenTelemetry; using global::OpenTelemetry.Context.Propagation; - internal class TracerConsumerMiddleware : IMessageMiddleware + internal class TracerConsumerMiddleware : IConsumerInstrumentationMiddleware { private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; private static readonly string ProcessString = "process"; diff --git a/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs b/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs index fb6272181..6616c13ed 100644 --- a/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs +++ b/src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs @@ -8,7 +8,7 @@ using global::OpenTelemetry; using global::OpenTelemetry.Context.Propagation; - internal class TracerProducerMiddleware : IMessageMiddleware + internal class TracerProducerMiddleware : IProducerInstrumentationMiddleware { private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; private static readonly string PublishString = "publish"; diff --git a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs index b328afb31..b32e521be 100644 --- a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs @@ -39,9 +39,9 @@ public ClusterConfiguration Build(KafkaConfiguration kafkaConfiguration) configuration.AddProducers(this.producers.Select(x => x.Build(configuration))); configuration.AddConsumers(this.consumers.Select(x => { - if(this.instrumentationConsumerMiddleware != null) + if (this.instrumentationConsumerMiddleware != null) { - x.AddInstrumentation(); + x.AddInstrumentation(); } return x.Build(configuration); @@ -107,11 +107,15 @@ public IClusterConfigurationBuilder AddConsumer(Action() - where TConsumerInstrumentationMiddleware : class, IMessageMiddleware - where TProducerInstrumentationMiddleware : class, IMessageMiddleware + where TConsumerInstrumentationMiddleware : class, IConsumerInstrumentationMiddleware + where TProducerInstrumentationMiddleware : class, IProducerInstrumentationMiddleware { - this.instrumentationConsumerMiddleware = typeof(TConsumerInstrumentationMiddleware); + this.instrumentationProducerMiddleware = typeof(IProducerInstrumentationMiddleware); + + //this.DependencyConfigurator + // .AddTransient(typeof(IConsumerInstrumentationMiddleware), this.instrumentationConsumerMiddleware) + // .AddTransient(typeof(IProducerInstrumentationMiddleware), this.instrumentationProducerMiddleware); return this; } diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index 9d51193be..ca2dc531e 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -229,10 +229,10 @@ public IConsumerConfigurationBuilder WithCustomFactory(ConsumerCustomFactory cus return this; } - public void AddInstrumentation() + public void AddInstrumentation(IConsumerInstrumentationMiddleware consumerInstrumentationMiddleware) where T : class, IMessageMiddleware { - this.middlewareConfigurationBuilder.AddAtBeginning(); + var type = typeof(IConsumerInstrumentationMiddleware); } public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) diff --git a/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs b/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs index 642f3641d..6ce82d57a 100644 --- a/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs @@ -44,6 +44,11 @@ public TBuilder AddAtBeginning(MiddlewareLifetime lifetime = MiddlewareLifeti return this.AddAt(0, lifetime); } + public TBuilder AddAtBeginning(Type messageMiddleware, MiddlewareLifetime lifetime = MiddlewareLifetime.ConsumerOrProducer) + { + return this.AddAt(messageMiddleware, 0, lifetime); + } + public IReadOnlyList Build() => this.middlewaresConfigurations; private static InstanceLifetime ParseLifetime(MiddlewareLifetime lifetime) @@ -92,5 +97,20 @@ private TBuilder AddAt( return this as TBuilder; } + + private TBuilder AddAt( + Type messageMiddleware, + int position, + MiddlewareLifetime lifetime = MiddlewareLifetime.ConsumerOrProducer) + { + + this.DependencyConfigurator.Add(typeof(IMessageMiddleware), messageMiddleware, ParseLifetime(lifetime)); + + this.middlewaresConfigurations.Insert( + position, + new MiddlewareConfiguration(messageMiddleware.GetType(), lifetime)); + + return this as TBuilder; + } } } diff --git a/src/KafkaFlow/Consumers/ConsumerContext.cs b/src/KafkaFlow/Consumers/ConsumerContext.cs index 282ee2eb5..267647e71 100644 --- a/src/KafkaFlow/Consumers/ConsumerContext.cs +++ b/src/KafkaFlow/Consumers/ConsumerContext.cs @@ -1,6 +1,7 @@ namespace KafkaFlow.Consumers { using System; + using System.Collections.Generic; using System.Threading; using Confluent.Kafka; @@ -38,6 +39,8 @@ public ConsumerContext( public string GroupId => this.consumer.Configuration.GroupId; + public IReadOnlyCollection Brokers => this.consumer.Configuration.ClusterConfiguration.Brokers; + public bool ShouldStoreOffset { get; set; } = true; public DateTime MessageTimestamp => this.kafkaResult.Message.Timestamp.UtcDateTime;