Skip to content

Commit

Permalink
Middlewares not fully implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Sep 20, 2023
1 parent 56cd9ba commit b1a91d0
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ IClusterConfigurationBuilder CreateTopicIfNotExists(

// Gets the cluster configuration
IClusterConfigurationBuilder AddInstrumentation<TConsumerInstrumentationMiddleware, TProducerInstrumentationMiddleware>()

Check warning on line 87 in src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IClusterConfigurationBuilder.AddInstrumentation<TConsumerInstrumentationMiddleware, TProducerInstrumentationMiddleware>()'
where TConsumerInstrumentationMiddleware : class, IMessageMiddleware
where TProducerInstrumentationMiddleware : class, IMessageMiddleware;
where TConsumerInstrumentationMiddleware : class, IConsumerInstrumentationMiddleware
where TProducerInstrumentationMiddleware : class, IProducerInstrumentationMiddleware;
}
}
3 changes: 3 additions & 0 deletions src/KafkaFlow.Abstractions/IConsumerContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow
{
using System;
using System.Collections.Generic;
using System.Threading;

/// <summary>
Expand Down Expand Up @@ -73,5 +74,7 @@ public interface IConsumerContext
/// Resume Kafka's message fetch
/// </summary>
void Resume();

IReadOnlyCollection<string> Brokers { get; }

Check warning on line 78 in src/KafkaFlow.Abstractions/IConsumerContext.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IConsumerContext.Brokers'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaFlow
{
public interface IConsumerInstrumentationMiddleware : IMessageMiddleware

Check warning on line 3 in src/KafkaFlow.Abstractions/IConsumerInstrumentationMiddleware.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IConsumerInstrumentationMiddleware'
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaFlow
{
public interface IProducerInstrumentationMiddleware : IMessageMiddleware

Check warning on line 3 in src/KafkaFlow.Abstractions/IProducerInstrumentationMiddleware.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IProducerInstrumentationMiddleware'
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using global::OpenTelemetry;
using global::OpenTelemetry.Context.Propagation;

internal class TracerConsumerMiddleware : IMessageMiddleware
internal class TracerConsumerMiddleware : IConsumerInstrumentationMiddleware
{
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
private static readonly string ProcessString = "process";

Check notice on line 14 in src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/Trace/TracerConsumerMiddleware.cs#L14

Replace this 'static readonly' declaration with 'const'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using global::OpenTelemetry;
using global::OpenTelemetry.Context.Propagation;

internal class TracerProducerMiddleware : IMessageMiddleware
internal class TracerProducerMiddleware : IProducerInstrumentationMiddleware
{
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
private static readonly string PublishString = "publish";

Check notice on line 14 in src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/Trace/TracerProducerMiddleware.cs#L14

Replace this 'static readonly' declaration with 'const'.
Expand Down
14 changes: 9 additions & 5 deletions src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public ClusterConfiguration Build(KafkaConfiguration kafkaConfiguration)
configuration.AddProducers(this.producers.Select(x => x.Build(configuration)));
configuration.AddConsumers(this.consumers.Select(x =>
{
if(this.instrumentationConsumerMiddleware != null)
if (this.instrumentationConsumerMiddleware != null)
{
x.AddInstrumentation<this.instrumentationConsumerMiddleware>();
x.AddInstrumentation<IMessageMiddleware>();

Check failure on line 44 in src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs

View workflow job for this annotation

GitHub Actions / Test deployment

There is no argument given that corresponds to the required formal parameter 'consumerInstrumentationMiddleware' of 'ConsumerConfigurationBuilder.AddInstrumentation<T>(IConsumerInstrumentationMiddleware)'
}
return x.Build(configuration);
Expand Down Expand Up @@ -107,11 +107,15 @@ public IClusterConfigurationBuilder AddConsumer(Action<IConsumerConfigurationBui
}

public IClusterConfigurationBuilder AddInstrumentation<TConsumerInstrumentationMiddleware, TProducerInstrumentationMiddleware>()

Check warning on line 109 in src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs#L109

'TProducerInstrumentationMiddleware' is not used in the method.
where TConsumerInstrumentationMiddleware : class, IMessageMiddleware
where TProducerInstrumentationMiddleware : class, IMessageMiddleware
where TConsumerInstrumentationMiddleware : class, IConsumerInstrumentationMiddleware
where TProducerInstrumentationMiddleware : class, IProducerInstrumentationMiddleware
{

this.instrumentationConsumerMiddleware = typeof(TConsumerInstrumentationMiddleware);
this.instrumentationProducerMiddleware = typeof(IProducerInstrumentationMiddleware);

//this.DependencyConfigurator
// .AddTransient(typeof(IConsumerInstrumentationMiddleware), this.instrumentationConsumerMiddleware)
// .AddTransient(typeof(IProducerInstrumentationMiddleware), this.instrumentationProducerMiddleware);

Check warning on line 118 in src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs#L118

Remove this commented out code.

return this;
}
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ public IConsumerConfigurationBuilder WithCustomFactory(ConsumerCustomFactory cus
return this;
}

public void AddInstrumentation<T>()
public void AddInstrumentation<T>(IConsumerInstrumentationMiddleware consumerInstrumentationMiddleware)

Check warning on line 232 in src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs#L232

'T' is not used in the method.
where T : class, IMessageMiddleware
{
this.middlewareConfigurationBuilder.AddAtBeginning<T>();
var type = typeof(IConsumerInstrumentationMiddleware);

Check notice on line 235 in src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs#L235

Remove the unused local variable 'type'.
}

public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
Expand Down
20 changes: 20 additions & 0 deletions src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public TBuilder AddAtBeginning<T>(MiddlewareLifetime lifetime = MiddlewareLifeti
return this.AddAt<T>(0, lifetime);
}

public TBuilder AddAtBeginning(Type messageMiddleware, MiddlewareLifetime lifetime = MiddlewareLifetime.ConsumerOrProducer)
{
return this.AddAt(messageMiddleware, 0, lifetime);
}

public IReadOnlyList<MiddlewareConfiguration> Build() => this.middlewaresConfigurations;

private static InstanceLifetime ParseLifetime(MiddlewareLifetime lifetime)
Expand Down Expand Up @@ -92,5 +97,20 @@ private TBuilder AddAt<T>(

return this as TBuilder;
}

private TBuilder AddAt(
Type messageMiddleware,
int position,
MiddlewareLifetime lifetime = MiddlewareLifetime.ConsumerOrProducer)
{

this.DependencyConfigurator.Add(typeof(IMessageMiddleware), messageMiddleware, ParseLifetime(lifetime));

this.middlewaresConfigurations.Insert(
position,
new MiddlewareConfiguration(messageMiddleware.GetType(), lifetime));

Check failure on line 111 in src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow/Configuration/MiddlewareConfigurationBuilder.cs#L111

Remove this use of 'GetType' on a 'System.Type'.

return this as TBuilder;
}
}
}
3 changes: 3 additions & 0 deletions src/KafkaFlow/Consumers/ConsumerContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow.Consumers
{
using System;
using System.Collections.Generic;
using System.Threading;
using Confluent.Kafka;

Expand Down Expand Up @@ -38,6 +39,8 @@ public ConsumerContext(

public string GroupId => this.consumer.Configuration.GroupId;

public IReadOnlyCollection<string> Brokers => this.consumer.Configuration.ClusterConfiguration.Brokers;

public bool ShouldStoreOffset { get; set; } = true;

public DateTime MessageTimestamp => this.kafkaResult.Message.Timestamp.UtcDateTime;
Expand Down

0 comments on commit b1a91d0

Please sign in to comment.