Skip to content

Commit

Permalink
fix: code review
Browse files Browse the repository at this point in the history
  • Loading branch information
Douglas Lima committed Feb 11, 2021
1 parent 724cd37 commit 0750203
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 96 deletions.
30 changes: 11 additions & 19 deletions samples/KafkaFlow.Avro.Sample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
namespace KafkaFlow.Avro.Sample
{
using System;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using global::Microsoft.Extensions.DependencyInjection;
using KafkaFlow.Admin;
using KafkaFlow.Admin.Messages;
using KafkaFlow.Compressor;
using KafkaFlow.Compressor.Gzip;
using KafkaFlow.Consumers;
using KafkaFlow.Producers;
using KafkaFlow.Sample;
using KafkaFlow.TypedHandler;
using MessageTypes;
using Serializer;
using Serializer.ApacheAvro;
using AutoOffsetReset = KafkaFlow.AutoOffsetReset;


internal static class Program
{
private static async Task Main()
Expand All @@ -43,12 +36,13 @@ private static async Task Main()
.DefaultTopic(topicName)
.AddMiddlewares(
middlewares => middlewares
.AddApacheAvroSerializer(
.AddSerializer(resolver => new ApacheAvroMessageSerializer(
resolver,
new AvroSerializerConfig
{
AutoRegisterSchemas = true,
SubjectNameStrategy = SubjectNameStrategy.Record
})
}))
)
)
.AddConsumer(
Expand All @@ -61,7 +55,7 @@ private static async Task Main()
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddApacheAvroSerializer()
.AddSerializer<ApacheAvroMessageSerializer>()
.AddTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
Expand All @@ -76,10 +70,8 @@ private static async Task Main()
var bus = provider.CreateKafkaBus();
await bus.StartAsync();

var consumers = provider.GetRequiredService<IConsumerAccessor>();
var producers = provider.GetRequiredService<IProducerAccessor>();

var adminProducer = provider.GetService<IAdminProducer>();
var producer = producers[producerName];

while (true)
{
Expand All @@ -92,10 +84,10 @@ private static async Task Main()
for (var i = 0; i < count; i++)
{
await Task.WhenAll(
producers[producerName].ProduceAsync(
Guid.NewGuid().ToString(),
new LogMessages1{Severity = LogLevel.Info}),
producers[producerName].ProduceAsync(
producer.ProduceAsync(
Guid.NewGuid().ToString(),
new LogMessages1{Severity = LogLevel.Info}),
producer.ProduceAsync(
Guid.NewGuid().ToString(),
new LogMessages2{Message = Guid.NewGuid().ToString()}));
}
Expand Down
7 changes: 4 additions & 3 deletions src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.DefaultTopic(AvroTopicName)
.AddMiddlewares(
middlewares => middlewares
.AddApacheAvroSerializer(
.AddSerializer(resolver => new ApacheAvroMessageSerializer(
resolver,
new AvroSerializerConfig
{
AutoRegisterSchemas = true,
SubjectNameStrategy = SubjectNameStrategy.Record
})
}))
)
)
.AddConsumer(
Expand All @@ -100,7 +101,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddApacheAvroSerializer()
.AddSerializer<ApacheAvroMessageSerializer>()
.AddTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
Expand Down
26 changes: 15 additions & 11 deletions src/KafkaFlow.Serializer.ApacheAvro/ApacheAvroMessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,44 @@
/// </summary>
public class ApacheAvroMessageSerializer : IMessageSerializer
{
private const string NoSchemaRegistryUrlMessage =
"There was not defined a schema registry to be used by ApachevroSerializer. Use 'WithSchemaRegistry' option at cluster level with a valid url.";
private readonly ISchemaRegistryClient schemaRegistryClient;
private readonly AvroSerializerConfig serializerConfig;

/// <summary>
/// </summary>
/// <param name="schemaRegistryClient">Schema registry client</param>
public ApacheAvroMessageSerializer(ISchemaRegistryClient schemaRegistryClient):
this(schemaRegistryClient, new AvroSerializerConfig())
/// <param name="resolver">The <see cref="IDependencyResolver"/> to be used by the framework</param>
public ApacheAvroMessageSerializer(IDependencyResolver resolver):
this(resolver, new AvroSerializerConfig())
{
}

/// <summary>
/// </summary>
/// <param name="schemaRegistryClient">Schema registry client</param>
/// <param name="resolver">The <see cref="IDependencyResolver"/> to be used by the framework</param>
/// <param name="serializerConfig">Avro serializer configuration</param>
public ApacheAvroMessageSerializer(
ISchemaRegistryClient schemaRegistryClient,
public ApacheAvroMessageSerializer(
IDependencyResolver resolver,
AvroSerializerConfig serializerConfig)
{
this.schemaRegistryClient = schemaRegistryClient ?? throw new InvalidOperationException(NoSchemaRegistryUrlMessage);
this.schemaRegistryClient = resolver.Resolve<ISchemaRegistryClient>() ??
throw new InvalidOperationException("There was not defined a schema registry to be used by ApacheAvroSerializer. Use 'WithSchemaRegistry' option at cluster level with a valid url.");

this.serializerConfig = serializerConfig ;
}

/// <inheritdoc/>
public byte[] Serialize(object message)
{
if (!(message is ISpecificRecord record))
{
throw new InvalidCastException($"The message type {message.GetType().FullName} must implement {nameof(ISpecificRecord)} interface.");
}

return new AvroSerializer<ISpecificRecord>(
this.schemaRegistryClient,
this.serializerConfig)
.AsSyncOverAsync()
.Serialize((ISpecificRecord) message, SerializationContext.Empty);
.Serialize(record, SerializationContext.Empty);
}

/// <inheritdoc/>
Expand All @@ -58,7 +63,6 @@ public object Deserialize(byte[] data, Type type)

return deserializer
.DeserializeAsync(data, data == null, SerializationContext.Empty)
.ConfigureAwait(false)
.GetAwaiter()
.GetResult();
}
Expand Down
63 changes: 0 additions & 63 deletions src/KafkaFlow.Serializer.ApacheAvro/ConfigurationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,68 +16,5 @@ public static IClusterConfigurationBuilder WithSchemaRegistry(
cluster.DependencyConfigurator.AddTransient<ISchemaRegistryClient>(factory => new CachedSchemaRegistryClient(config));
return cluster;
}

public static IProducerMiddlewareConfigurationBuilder AddApacheAvroSerializer(
this IProducerMiddlewareConfigurationBuilder middlewares)
{
return middlewares.AddApacheAvroSerializer(new AvroSerializerConfig());
}

public static IProducerMiddlewareConfigurationBuilder AddApacheAvroSerializer<TResolver>(
this IProducerMiddlewareConfigurationBuilder middlewares) where TResolver : class, IMessageTypeResolver
{
return middlewares.AddApacheAvroSerializer<TResolver>(new AvroSerializerConfig());
}

public static IProducerMiddlewareConfigurationBuilder AddApacheAvroSerializer(
this IProducerMiddlewareConfigurationBuilder middlewares,
AvroSerializerConfig serializerConfig)
{
middlewares.DependencyConfigurator.AddTransient<ApacheAvroMessageSerializer>();

return middlewares.Add(
resolver => new SerializerProducerMiddleware(
new ApacheAvroMessageSerializer(
resolver.Resolve<ISchemaRegistryClient>(),
serializerConfig),
new DefaultMessageTypeResolver()));
}

public static IProducerMiddlewareConfigurationBuilder AddApacheAvroSerializer<TResolver>(
this IProducerMiddlewareConfigurationBuilder middlewares,
AvroSerializerConfig serializerConfig) where TResolver : class, IMessageTypeResolver
{
middlewares.DependencyConfigurator.AddTransient<TResolver>();
middlewares.DependencyConfigurator.AddTransient<ApacheAvroMessageSerializer>();

return middlewares.Add(
resolver => new SerializerProducerMiddleware(
new ApacheAvroMessageSerializer(
resolver.Resolve<ISchemaRegistryClient>(),
serializerConfig),
resolver.Resolve<TResolver>()));
}

public static IConsumerMiddlewareConfigurationBuilder AddApacheAvroSerializer(
this IConsumerMiddlewareConfigurationBuilder middlewares)
{
middlewares.DependencyConfigurator.AddTransient<ApacheAvroMessageSerializer>();

return middlewares.Add(
resolver => new SerializerConsumerMiddleware(
new ApacheAvroMessageSerializer(resolver.Resolve<ISchemaRegistryClient>()),
new DefaultMessageTypeResolver()));
}

public static IConsumerMiddlewareConfigurationBuilder AddApacheAvroSerializer<TResolver>(
this IConsumerMiddlewareConfigurationBuilder middlewares) where TResolver : class, IMessageTypeResolver
{
middlewares.DependencyConfigurator.AddTransient<ApacheAvroMessageSerializer>();

return middlewares.Add(
resolver => new SerializerConsumerMiddleware(
new ApacheAvroMessageSerializer(resolver.Resolve<ISchemaRegistryClient>()),
resolver.Resolve<TResolver>()));
}
}
}
1 change: 1 addition & 0 deletions src/KafkaFlow/KafkaFlow.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<TargetFramework>netstandard2.0</TargetFramework>
<PackageId>KafkaFlow</PackageId>
<Description>KafkaFlow main package</Description>
<PackageIconUrl>https://raw.githubusercontent.com/Farfetch/.github/master/images/fuse-logo-128.png</PackageIconUrl>
</PropertyGroup>

<ItemGroup>
Expand Down

0 comments on commit 0750203

Please sign in to comment.