Skip to content

Commit

Permalink
Include AddOpenTelemetryInstrumentation in builder
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Sep 19, 2023
1 parent 7fdcd4e commit 56cd9ba
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 3 deletions.
1 change: 1 addition & 0 deletions samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<ProjectReference Include="..\..\src\KafkaFlow.Compressor\KafkaFlow.Compressor.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.LogHandler.Console\KafkaFlow.LogHandler.Console.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.OpenTelemetry\KafkaFlow.OpenTelemetry.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.JsonCore\KafkaFlow.Serializer.JsonCore.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.ProtobufNet\KafkaFlow.Serializer.ProtobufNet.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer\KafkaFlow.Serializer.csproj" />
Expand Down
2 changes: 2 additions & 0 deletions samples/KafkaFlow.Sample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Threading.Tasks;
using KafkaFlow;
using KafkaFlow.Configuration;
using KafkaFlow.Producers;
using KafkaFlow.Sample;
using KafkaFlow.Serializer;
Expand Down Expand Up @@ -37,6 +38,7 @@
.AddTypedHandlers(h => h.AddHandler<PrintConsoleHandler>())
)
)
.AddOpenTelemetryInstrumentation()
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,10 @@ IClusterConfigurationBuilder CreateTopicIfNotExists(
string topicName,
int numberOfPartitions,
short replicationFactor);

// Gets the cluster configuration
IClusterConfigurationBuilder AddInstrumentation<TConsumerInstrumentationMiddleware, TProducerInstrumentationMiddleware>()

Check warning on line 87 in src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IClusterConfigurationBuilder.AddInstrumentation<TConsumerInstrumentationMiddleware, TProducerInstrumentationMiddleware>()'
where TConsumerInstrumentationMiddleware : class, IMessageMiddleware
where TProducerInstrumentationMiddleware : class, IMessageMiddleware;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;

Check warning on line 1 in src/KafkaFlow.Abstractions/Configuration/IInstrumentationBuilder.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Using directive should appear within a namespace declaration
using System.Collections.Generic;

Check warning on line 2 in src/KafkaFlow.Abstractions/Configuration/IInstrumentationBuilder.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Using directive should appear within a namespace declaration
using System.Text;

Check warning on line 3 in src/KafkaFlow.Abstractions/Configuration/IInstrumentationBuilder.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Using directive should appear within a namespace declaration

namespace KafkaFlow.Configuration
{
public interface IInstrumentationBuilder

Check warning on line 7 in src/KafkaFlow.Abstractions/Configuration/IInstrumentationBuilder.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IInstrumentationBuilder'
{
void AddInstrumentation();

Check warning on line 9 in src/KafkaFlow.Abstractions/Configuration/IInstrumentationBuilder.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IInstrumentationBuilder.AddInstrumentation()'
}
}
18 changes: 18 additions & 0 deletions src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace KafkaFlow.Configuration
{
using KafkaFlow.OpenTelemetry.Trace;

/// <summary>
/// Adds OpenTelemetry instrumentation
/// </summary>
public static class ExtensionMethods
{
/// <summary>
/// Adds OpenTelemetry instrumentation
/// </summary>
/// <param name="builder">The Kafka configuration builder</param>
/// <returns></returns>
public static IClusterConfigurationBuilder AddOpenTelemetryInstrumentation(this IClusterConfigurationBuilder builder) =>
builder.AddInstrumentation<TracerConsumerMiddleware, TracerProducerMiddleware>();
}
}
2 changes: 1 addition & 1 deletion src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<ItemGroup>
<PackageReference Include="OpenTelemetry.Api" Version="1.6.0" />
<PackageReference Include="OpenTelemetry.Extensions.Propagators" Version="1.6.0"/>
<PackageReference Include="OpenTelemetry.Extensions.Propagators" Version="1.6.0" />
<PackageReference Include="OpenTelemetry.SemanticConventions" Version="1.0.0-rc9.9">
<Aliases>SemanticConventions</Aliases>
</PackageReference>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace KafkaFlow.OpenTelemetry.Trace
using System.Reflection;
using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions;

internal class KafkaFlowActivitySourceHelper
internal static class KafkaFlowActivitySourceHelper
{
public static readonly ActivitySource ActivitySource = new ActivitySource(KafkaFlowString, Version.ToString());

Check warning on line 13 in src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.OpenTelemetry/Trace/KafkaFlowActivitySourceHelper.cs#L13

Move this field's initializer into a static constructor.

Expand Down
22 changes: 21 additions & 1 deletion src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ internal class ClusterConfigurationBuilder : IClusterConfigurationBuilder
private IEnumerable<string> brokers;
private string name;
private Func<SecurityInformation> securityInformationHandler;
private Type instrumentationConsumerMiddleware = typeof(IMessageMiddleware);
private Type instrumentationProducerMiddleware = typeof(IMessageMiddleware);

Check warning on line 19 in src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs#L19

Make 'instrumentationProducerMiddleware' 'readonly'.

public ClusterConfigurationBuilder(IDependencyConfigurator dependencyConfigurator)
{
Expand All @@ -35,7 +37,15 @@ public ClusterConfiguration Build(KafkaConfiguration kafkaConfiguration)
this.topicsToCreateIfNotExist);

configuration.AddProducers(this.producers.Select(x => x.Build(configuration)));
configuration.AddConsumers(this.consumers.Select(x => x.Build(configuration)));
configuration.AddConsumers(this.consumers.Select(x =>
{
if(this.instrumentationConsumerMiddleware != null)
{
x.AddInstrumentation<this.instrumentationConsumerMiddleware>();

Check failure on line 44 in src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Invalid expression term ')'
}
return x.Build(configuration);
}));

return configuration;
}
Expand Down Expand Up @@ -96,6 +106,16 @@ public IClusterConfigurationBuilder AddConsumer(Action<IConsumerConfigurationBui
return this;
}

public IClusterConfigurationBuilder AddInstrumentation<TConsumerInstrumentationMiddleware, TProducerInstrumentationMiddleware>()

Check warning on line 109 in src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs#L109

'TProducerInstrumentationMiddleware' is not used in the method.
where TConsumerInstrumentationMiddleware : class, IMessageMiddleware
where TProducerInstrumentationMiddleware : class, IMessageMiddleware
{

this.instrumentationConsumerMiddleware = typeof(TConsumerInstrumentationMiddleware);

return this;
}

public IClusterConfigurationBuilder OnStopping(Action<IDependencyResolver> handler)
{
this.onStoppingHandler = handler;
Expand Down
6 changes: 6 additions & 0 deletions src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ public IConsumerConfigurationBuilder WithCustomFactory(ConsumerCustomFactory cus
return this;
}

public void AddInstrumentation<T>()
where T : class, IMessageMiddleware
{
this.middlewareConfigurationBuilder.AddAtBeginning<T>();
}

public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
{
var middlewareConfiguration = this.middlewareConfigurationBuilder.Build();
Expand Down

0 comments on commit 56cd9ba

Please sign in to comment.