diff --git a/src/KafkaFlow.Abstractions/Extensions/DependencyConfiguratorExtensions.cs b/src/KafkaFlow.Abstractions/Extensions/DependencyConfiguratorExtensions.cs index c316a28b9..2c6b552db 100644 --- a/src/KafkaFlow.Abstractions/Extensions/DependencyConfiguratorExtensions.cs +++ b/src/KafkaFlow.Abstractions/Extensions/DependencyConfiguratorExtensions.cs @@ -1,4 +1,5 @@ using System; +using KafkaFlow.Configuration; namespace KafkaFlow { @@ -21,6 +22,22 @@ public static IDependencyConfigurator AddSingleton(th return configurator.Add(InstanceLifetime.Singleton); } + /// + /// Registers a singleton type mapping if the given is not already registered + /// + /// The object that this method was called on. + /// that will be requested. + /// that will actually be returned. + /// + public static IDependencyConfigurator TryAddSingleton(this IDependencyConfigurator configurator) + where TImplementation : class, TService + where TService : class + { + return configurator.AlreadyRegistered(typeof(TService)) + ? configurator + : configurator.Add(InstanceLifetime.Singleton); + } + /// /// Registers a singleton type mapping /// @@ -33,6 +50,20 @@ public static IDependencyConfigurator AddSingleton(this IDependencyCon return configurator.Add(InstanceLifetime.Singleton); } + /// + /// Registers a singleton type mapping if the given is not already registered + /// + /// The object that this method was called on + /// that will be created + /// + public static IDependencyConfigurator TryAddSingleton(this IDependencyConfigurator configurator) + where TService : class + { + return configurator.AlreadyRegistered(typeof(TService)) + ? configurator + : configurator.Add(InstanceLifetime.Singleton); + } + /// /// Registers a singleton type mapping where the returned instance will be the given implementation /// @@ -48,6 +79,23 @@ public static IDependencyConfigurator AddSingleton( return configurator.Add(service); } + /// + /// Registers a singleton type mapping where the returned instance will be the given implementation if the given is not already registered + /// + /// The object that this method was called on + /// that will be returned + /// Type that will be created + /// + public static IDependencyConfigurator TryAddSingleton( + this IDependencyConfigurator configurator, + TService service) + where TService : class + { + return configurator.AlreadyRegistered(typeof(TService)) + ? configurator + : configurator.Add(service); + } + /// /// Registers a singleton type mapping where the returned instance will be given by the provided factory /// @@ -65,6 +113,25 @@ public static IDependencyConfigurator AddSingleton( InstanceLifetime.Singleton); } + /// + /// Registers a singleton type mapping where the returned instance will be given by the provided factory if the given is not already registered + /// + /// The object that this method was called on + /// A factory to create new instances of the service implementation + /// Type that will be created + /// + public static IDependencyConfigurator TryAddSingleton( + this IDependencyConfigurator configurator, + Func factory) + { + return configurator.AlreadyRegistered(typeof(TService)) + ? configurator + : configurator.Add( + typeof(TService), + factory, + InstanceLifetime.Singleton); + } + /// /// Registers a scoped type mapping where the returned instance will be given by the provided factory /// @@ -112,6 +179,26 @@ public static IDependencyConfigurator AddTransient( InstanceLifetime.Transient); } + /// + /// Registers a transient type mapping if the given is not already registered + /// + /// The object that this method was called on + /// that will be requested + /// that will actually be returned + /// + public static IDependencyConfigurator TryAddTransient( + this IDependencyConfigurator configurator, + Type serviceType, + Type implementationType) + { + return configurator.AlreadyRegistered(serviceType) + ? configurator + : configurator.Add( + serviceType, + implementationType, + InstanceLifetime.Transient); + } + /// /// Registers a transient type mapping /// @@ -124,6 +211,50 @@ public static IDependencyConfigurator AddTransient(this IDependencyCon return configurator.Add(InstanceLifetime.Transient); } + /// + /// Registers a transient type mapping if the given is not already registered + /// + /// The object that this method was called on + /// Type that will be created + /// + public static IDependencyConfigurator TryAddTransient(this IDependencyConfigurator configurator) + where TService : class + { + return configurator.AlreadyRegistered(typeof(TService)) + ? configurator + : configurator.Add(InstanceLifetime.Transient); + } + + /// + /// Registers a singleton type mapping + /// + /// The object that this method was called on. + /// that will be requested. + /// that will actually be returned. + /// + public static IDependencyConfigurator AddTransient(this IDependencyConfigurator configurator) + where TImplementation : class, TService + where TService : class + { + return configurator.Add(InstanceLifetime.Transient); + } + + /// + /// Registers a singleton type mapping if the given is not already registered + /// + /// The object that this method was called on. + /// that will be requested. + /// that will actually be returned. + /// + public static IDependencyConfigurator TryAddTransient(this IDependencyConfigurator configurator) + where TImplementation : class, TService + where TService : class + { + return configurator.AlreadyRegistered(typeof(TService)) + ? configurator + : configurator.Add(InstanceLifetime.Transient); + } + /// /// Registers a transient type mapping where the returned instance will be given by the provided factory /// @@ -140,5 +271,38 @@ public static IDependencyConfigurator AddTransient( factory, InstanceLifetime.Transient); } + + /// + /// Registers a transient type mapping where the returned instance will be given by the provided factory if the given is not already registered + /// + /// The object that this method was called on + /// A factory to create new instances of the service implementation + /// Type that will be created + /// + public static IDependencyConfigurator TryAddTransient( + this IDependencyConfigurator configurator, + Func factory) + { + return configurator.AlreadyRegistered(typeof(TService)) + ? configurator + : configurator.Add( + typeof(TService), + factory, + InstanceLifetime.Transient); + } + + /// + /// Add a type mapping to the cluster dependency resolver + /// + /// Cluster configuration builder + /// A handler to set the configuration values + /// + public static IClusterConfigurationBuilder WithDependencies( + this IClusterConfigurationBuilder cluster, + Action handler) + { + handler(cluster.DependencyConfigurator); + return cluster; + } } } diff --git a/src/KafkaFlow.Abstractions/IDependencyConfigurator.cs b/src/KafkaFlow.Abstractions/IDependencyConfigurator.cs index 47d52ce46..71cd7400b 100644 --- a/src/KafkaFlow.Abstractions/IDependencyConfigurator.cs +++ b/src/KafkaFlow.Abstractions/IDependencyConfigurator.cs @@ -54,5 +54,12 @@ IDependencyConfigurator Add( Type serviceType, Func factory, InstanceLifetime lifetime); + + /// + /// Checks if the given is already registered + /// + /// Service type + /// + bool AlreadyRegistered(Type registeredType); } } diff --git a/src/KafkaFlow.Microsoft.DependencyInjection/MicrosoftDependencyConfigurator.cs b/src/KafkaFlow.Microsoft.DependencyInjection/MicrosoftDependencyConfigurator.cs index d7cec29e3..9ac0d17a6 100644 --- a/src/KafkaFlow.Microsoft.DependencyInjection/MicrosoftDependencyConfigurator.cs +++ b/src/KafkaFlow.Microsoft.DependencyInjection/MicrosoftDependencyConfigurator.cs @@ -73,6 +73,19 @@ public IDependencyConfigurator Add( return this; } + public bool AlreadyRegistered(Type registeredType) + { + foreach (var s in _services) + { + if (s.ServiceType == registeredType) + { + return true; + } + } + + return false; + } + private static ServiceLifetime ParseLifetime(InstanceLifetime lifetime) { switch (lifetime) diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs index 79be740ff..a8dd3c3b1 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs @@ -4,7 +4,7 @@ namespace KafkaFlow.Serializer.SchemaRegistry { - internal class ConfluentAvroTypeNameResolver : ISchemaRegistryTypeNameResolver + internal sealed class ConfluentAvroTypeNameResolver : IConfluentAvroTypeNameResolver { private readonly ISchemaRegistryClient _client; diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs index e0e44fda5..4ca34e35e 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs @@ -1,5 +1,4 @@ -using Confluent.SchemaRegistry; -using KafkaFlow.Configuration; +using KafkaFlow.Configuration; using KafkaFlow.Middlewares.Serializer; using KafkaFlow.Serializer.SchemaRegistry; @@ -18,10 +17,12 @@ public static class ConsumerConfigurationBuilderExtensions public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryAvroDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares) { + middlewares.DependencyConfigurator.TryAddTransient(); + return middlewares.Add( resolver => new DeserializerConsumerMiddleware( new ConfluentAvroDeserializer(resolver), - new SchemaRegistryTypeResolver(new ConfluentAvroTypeNameResolver(resolver.Resolve())))); + new SchemaRegistryTypeResolver(resolver.Resolve()))); } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/IConfluentAvroTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/IConfluentAvroTypeNameResolver.cs new file mode 100644 index 000000000..19391b03e --- /dev/null +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/IConfluentAvroTypeNameResolver.cs @@ -0,0 +1,7 @@ +namespace KafkaFlow.Serializer.SchemaRegistry +{ + /// + public interface IConfluentAvroTypeNameResolver : ISchemaRegistryTypeNameResolver + { + } +} diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ProducerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ProducerConfigurationBuilderExtensions.cs index 5c4e98536..57adba494 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ProducerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ProducerConfigurationBuilderExtensions.cs @@ -21,10 +21,12 @@ public static IProducerMiddlewareConfigurationBuilder AddSchemaRegistryAvroSeria this IProducerMiddlewareConfigurationBuilder middlewares, AvroSerializerConfig config = null) { + middlewares.DependencyConfigurator.TryAddTransient(); + return middlewares.Add( resolver => new SerializerProducerMiddleware( new ConfluentAvroSerializer(resolver, config), - new SchemaRegistryTypeResolver(new ConfluentAvroTypeNameResolver(resolver.Resolve())))); + new SchemaRegistryTypeResolver(resolver.Resolve()))); } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs index 17459bd5a..f8402e685 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs @@ -1,3 +1,4 @@ using System.Runtime.CompilerServices; [assembly: InternalsVisibleTo("KafkaFlow.UnitTests")] +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs index ff754b45a..665bb4af1 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs @@ -4,9 +4,9 @@ using Google.Protobuf; using Google.Protobuf.Reflection; -namespace KafkaFlow +namespace KafkaFlow.Serializer.SchemaRegistry { - internal class ConfluentProtobufTypeNameResolver : ISchemaRegistryTypeNameResolver + internal class ConfluentProtobufTypeNameResolver : IConfluentProtobufTypeNameResolver { private readonly ISchemaRegistryClient _client; @@ -21,7 +21,18 @@ public async Task ResolveAsync(int id) var protoFields = FileDescriptorProto.Parser.ParseFrom(ByteString.FromBase64(schemaString)); - return $"{protoFields.Package}.{protoFields.MessageType.FirstOrDefault()?.Name}"; + return BuildTypeName(protoFields); + } + + private static string BuildTypeName(FileDescriptorProto protoFields) + { + var package = protoFields.Package; + if (string.IsNullOrEmpty(package)) + { + package = protoFields.Options.CsharpNamespace; + } + + return $"{package}.{protoFields.MessageType.FirstOrDefault()?.Name}"; } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs index 558eb8056..b60d20812 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs @@ -1,5 +1,4 @@ -using Confluent.SchemaRegistry; -using KafkaFlow.Configuration; +using KafkaFlow.Configuration; using KafkaFlow.Middlewares.Serializer; using KafkaFlow.Serializer.SchemaRegistry; @@ -18,10 +17,12 @@ public static class ConsumerConfigurationBuilderExtensions public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryProtobufDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares) { + middlewares.DependencyConfigurator.TryAddTransient(); + return middlewares.Add( resolver => new DeserializerConsumerMiddleware( new ConfluentProtobufDeserializer(), - new SchemaRegistryTypeResolver(new ConfluentProtobufTypeNameResolver(resolver.Resolve())))); + new SchemaRegistryTypeResolver(resolver.Resolve()))); } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/IConfluentProtobufTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/IConfluentProtobufTypeNameResolver.cs new file mode 100644 index 000000000..1f17fa35f --- /dev/null +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/IConfluentProtobufTypeNameResolver.cs @@ -0,0 +1,7 @@ +namespace KafkaFlow.Serializer.SchemaRegistry +{ + /// + public interface IConfluentProtobufTypeNameResolver : ISchemaRegistryTypeNameResolver + { + } +} diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ProducerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ProducerConfigurationBuilderExtensions.cs index 180073f59..9c7e0b977 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ProducerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ProducerConfigurationBuilderExtensions.cs @@ -1,5 +1,4 @@ -using Confluent.SchemaRegistry; -using Confluent.SchemaRegistry.Serdes; +using Confluent.SchemaRegistry.Serdes; using KafkaFlow.Configuration; using KafkaFlow.Middlewares.Serializer; using KafkaFlow.Serializer.SchemaRegistry; @@ -21,10 +20,12 @@ public static IProducerMiddlewareConfigurationBuilder AddSchemaRegistryProtobufS this IProducerMiddlewareConfigurationBuilder middlewares, ProtobufSerializerConfig config = null) { + middlewares.DependencyConfigurator.TryAddTransient(); + return middlewares.Add( resolver => new SerializerProducerMiddleware( new ConfluentProtobufSerializer(resolver, config), - new SchemaRegistryTypeResolver(new ConfluentProtobufTypeNameResolver(resolver.Resolve())))); + new SchemaRegistryTypeResolver(resolver.Resolve()))); } } } diff --git a/src/KafkaFlow.Unity/UnityDependencyConfigurator.cs b/src/KafkaFlow.Unity/UnityDependencyConfigurator.cs index 452e0d19e..7e24b35ac 100644 --- a/src/KafkaFlow.Unity/UnityDependencyConfigurator.cs +++ b/src/KafkaFlow.Unity/UnityDependencyConfigurator.cs @@ -1,7 +1,7 @@ using System; using System.Linq; -using global::Unity; -using global::Unity.Lifetime; +using Unity; +using Unity.Lifetime; namespace KafkaFlow.Unity { @@ -67,7 +67,7 @@ public IDependencyConfigurator Add( { string name = null; - if (this.AlreadyRegistered(serviceType)) + if (AlreadyRegistered(serviceType)) { name = Guid.NewGuid().ToString(); } @@ -81,6 +81,12 @@ public IDependencyConfigurator Add( return this; } + /// + public bool AlreadyRegistered(Type registeredType) + { + return _container.Registrations.Any(x => x.RegisteredType == registeredType); + } + private static object ParseLifetime(InstanceLifetime lifetime) => lifetime switch { @@ -89,10 +95,5 @@ private static object ParseLifetime(InstanceLifetime lifetime) => InstanceLifetime.Transient => new TransientLifetimeManager(), _ => throw new InvalidCastException($"There is not mapping defined to {lifetime}") }; - - private bool AlreadyRegistered(Type registeredType) - { - return _container.Registrations.Any(x => x.RegisteredType == registeredType); - } } } diff --git a/tests/KafkaFlow.UnitTests/ConfigurationBuilders/DependencyConfiguratorExtensionsTests.cs b/tests/KafkaFlow.UnitTests/ConfigurationBuilders/DependencyConfiguratorExtensionsTests.cs new file mode 100644 index 000000000..71b87e606 --- /dev/null +++ b/tests/KafkaFlow.UnitTests/ConfigurationBuilders/DependencyConfiguratorExtensionsTests.cs @@ -0,0 +1,143 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq.Expressions; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Moq; + +namespace KafkaFlow.UnitTests.ConfigurationBuilders +{ + [TestClass] + public class DependencyConfiguratorExtensionsTests + { + private static IEnumerable _methodsForAdd + { + get + { + yield return new object[] + { + (Action)(configurator => configurator.AddSingleton()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Singleton)), + }; + yield return new object[] + { + (Action)(configurator => configurator.AddSingleton()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Singleton)), + }; + yield return new object[] + { + (Action)(configurator => configurator.AddSingleton(new ArrayList())), + (Expression>)(configurator => configurator.Add(It.IsAny())), + }; + yield return new object[] + { + (Action)(configurator => configurator.AddSingleton(_ => new ArrayList())), + (Expression>)(configurator => configurator.Add(typeof(ArrayList), It.IsAny>(), InstanceLifetime.Singleton)), + }; + yield return new object[] + { + (Action)(configurator => configurator.AddTransient()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Transient)), + }; + yield return new object[] + { + (Action)(configurator => configurator.AddTransient()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Transient)), + }; + yield return new object[] + { + (Action)(configurator => configurator.AddTransient(_ => new ArrayList())), + (Expression>)(configurator => configurator.Add(typeof(ArrayList), It.IsAny>(), InstanceLifetime.Transient)), + }; + } + } + + private static IEnumerable _methodsForTryAdd + { + get + { + yield return new object[] + { + (Action)(configurator => configurator.TryAddSingleton()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Singleton)), + }; + yield return new object[] + { + (Action)(configurator => configurator.TryAddSingleton()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Singleton)), + }; + yield return new object[] + { + (Action)(configurator => configurator.TryAddSingleton(new ArrayList())), + (Expression>)(configurator => configurator.Add(It.IsAny())), + }; + yield return new object[] + { + (Action)(configurator => configurator.TryAddSingleton(_ => new ArrayList())), + (Expression>)(configurator => configurator.Add(typeof(ArrayList), It.IsAny>(), InstanceLifetime.Singleton)), + }; + yield return new object[] + { + (Action)(configurator => configurator.TryAddTransient()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Transient)), + }; + yield return new object[] + { + (Action)(configurator => configurator.TryAddTransient()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Transient)), + }; + yield return new object[] + { + (Action)(configurator => configurator.TryAddTransient(_ => new ArrayList())), + (Expression>)(configurator => configurator.Add(typeof(ArrayList), It.IsAny>(), InstanceLifetime.Transient)), + }; + } + } + + [TestMethod] + [DynamicData(nameof(_methodsForAdd))] + public void Add(Action addMethod, Expression> underlyingAddMethod) + { + // Arrange + var configurator = new Mock(); + + // Act + addMethod(configurator.Object); + + // Assert + configurator.Verify(underlyingAddMethod, Times.Once); + } + + [TestMethod] + [DynamicData(nameof(_methodsForTryAdd))] + public void TryAdd_AddWhenNotExists(Action tryAddMethod, Expression> underlyingAddMethod) + { + // Arrange + var configurator = new Mock(); + configurator.Setup(c => c.AlreadyRegistered(typeof(IList))).Returns(false); + configurator.Setup(c => c.AlreadyRegistered(typeof(ArrayList))).Returns(false); + + // Act + tryAddMethod(configurator.Object); + + // Assert + configurator.Verify(underlyingAddMethod, Times.Once); + } + + [TestMethod] + [DynamicData(nameof(_methodsForTryAdd))] + public void TryAdd_IgnoreWhenExists(Action tryAddMethod, Expression> underlyingAddMethod) + { + // Arrange + var configurator = new Mock(); + configurator.Setup(c => c.AlreadyRegistered(typeof(IList))).Returns(true); + configurator.Setup(c => c.AlreadyRegistered(typeof(ArrayList))).Returns(true); + + // Act + tryAddMethod(configurator.Object); + + // Assert + configurator.Verify(underlyingAddMethod, Times.Never); + } + } +} diff --git a/tests/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj b/tests/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj index 4819c54ce..62f4b7482 100644 --- a/tests/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj +++ b/tests/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj @@ -26,6 +26,8 @@ + + diff --git a/tests/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs b/tests/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs index 4185222eb..5252be581 100644 --- a/tests/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs +++ b/tests/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs @@ -1,10 +1,10 @@ using System; using System.Threading.Tasks; using Confluent.SchemaRegistry; - using FluentAssertions; - using Google.Protobuf; +using Google.Protobuf.Reflection; +using KafkaFlow.Serializer.SchemaRegistry; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -13,36 +13,63 @@ namespace KafkaFlow.UnitTests.Middlewares.Serialization [TestClass] public class ConfluentProtobufTypeNameResolverTests { - private readonly Mock _schemaRegistryClient; - private readonly ConfluentProtobufTypeNameResolver _schemaRegistryTypeResolver; + private const string MessageTypeName = "TestMessage"; - public ConfluentProtobufTypeNameResolverTests() + [TestMethod] + public async Task ResolveAsync_WithPackage_ReturnTypeName() { - _schemaRegistryClient = new Mock(); - _schemaRegistryTypeResolver = new ConfluentProtobufTypeNameResolver(_schemaRegistryClient.Object); + // Arrange + var schemaRegistryClientMock = CreateSchemaRegistryClientMock(p => p.Package = "TestPackage"); + var resolver = new ConfluentProtobufTypeNameResolver(schemaRegistryClientMock.Object); + + // Act + var typeName = await resolver.ResolveAsync(1); + + // Assert + typeName.Should().Be($"TestPackage.{MessageTypeName}"); } [TestMethod] - public async Task ResolveAsync_ValidProtobufObject_ReturnsProtoFields() + public async Task ResolveAsync_NoPackageWithCsharpNamespace_ReturnTypeName() { // Arrange - var schemaId = 420; + var schemaRegistryClientMock = CreateSchemaRegistryClientMock(p => + { + p.Package = string.Empty; + p.Options.CsharpNamespace = "TestCsharpNamespace"; + }); + var resolver = new ConfluentProtobufTypeNameResolver(schemaRegistryClientMock.Object); - var dummyProtobufObj = new DummyProtobufObject + // Act + var typeName = await resolver.ResolveAsync(1); + + // Assert + typeName.Should().Be($"TestCsharpNamespace.{MessageTypeName}"); + } + + private static Mock CreateSchemaRegistryClientMock(Action configure) + { + var protoFields = new FileDescriptorProto { - Field1 = "Field1", - Field2 = 8, + MessageType = + { + new DescriptorProto + { + Name = MessageTypeName, + }, + }, + Options = new FileOptions(), }; - var base64Encoded = Convert.ToBase64String(dummyProtobufObj.ToByteArray()); + configure(protoFields); - _schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, "serialized")) - .ReturnsAsync(new Schema(base64Encoded, SchemaType.Protobuf)); + var schema = new Schema(protoFields.ToByteString().ToBase64(), SchemaType.Protobuf); - // Act - var protoFields = await _schemaRegistryTypeResolver.ResolveAsync(schemaId); + var schemaRegistryClientMock = new Mock(); + schemaRegistryClientMock + .Setup(o => o.GetSchemaAsync(1, "serialized")) + .ReturnsAsync(schema); - // Assert - protoFields.Should().NotBeNull(); + return schemaRegistryClientMock; } } }