diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/AssemblyInfo.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/AssemblyInfo.cs
new file mode 100644
index 000000000..17459bd5a
--- /dev/null
+++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/AssemblyInfo.cs
@@ -0,0 +1,3 @@
+using System.Runtime.CompilerServices;
+
+[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")]
diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs
new file mode 100644
index 000000000..17459bd5a
--- /dev/null
+++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs
@@ -0,0 +1,3 @@
+using System.Runtime.CompilerServices;
+
+[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")]
diff --git a/src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.cs b/src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.cs
new file mode 100644
index 000000000..1bc60a9b1
--- /dev/null
+++ b/src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.cs
@@ -0,0 +1,201 @@
+//
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: DummyProtobufObject.proto
+//
+#pragma warning disable 1591, 0612, 3021
+#region Designer generated code
+
+using pb = global::Google.Protobuf;
+using pbc = global::Google.Protobuf.Collections;
+using pbr = global::Google.Protobuf.Reflection;
+using scg = global::System.Collections.Generic;
+namespace KafkaFlow.UnitTests {
+
+ /// Holder for reflection information generated from DummyProtobufObject.proto
+ public static partial class DummyProtobufObjectReflection {
+
+ #region Descriptor
+ /// File descriptor for DummyProtobufObject.proto
+ public static pbr::FileDescriptor Descriptor {
+ get { return descriptor; }
+ }
+ private static pbr::FileDescriptor descriptor;
+
+ static DummyProtobufObjectReflection() {
+ byte[] descriptorData = global::System.Convert.FromBase64String(
+ string.Concat(
+ "ChlEdW1teVByb3RvYnVmT2JqZWN0LnByb3RvEhNLYWZrYUZsb3cuVW5pdFRl",
+ "c3RzIjUKE0R1bW15UHJvdG9idWZPYmplY3QSDgoGZmllbGQxGAEgASgJEg4K",
+ "BmZpZWxkMhgCIAEoBWIGcHJvdG8z"));
+ descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
+ new pbr::FileDescriptor[] { },
+ new pbr::GeneratedClrTypeInfo(null, null, new pbr::GeneratedClrTypeInfo[] {
+ new pbr::GeneratedClrTypeInfo(typeof(global::KafkaFlow.UnitTests.DummyProtobufObject), global::KafkaFlow.UnitTests.DummyProtobufObject.Parser, new[]{ "Field1", "Field2" }, null, null, null, null)
+ }));
+ }
+ #endregion
+
+ }
+ #region Messages
+ public sealed partial class DummyProtobufObject : pb::IMessage {
+ private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new DummyProtobufObject());
+ private pb::UnknownFieldSet _unknownFields;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public static pb::MessageParser Parser { get { return _parser; } }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public static pbr::MessageDescriptor Descriptor {
+ get { return global::KafkaFlow.UnitTests.DummyProtobufObjectReflection.Descriptor.MessageTypes[0]; }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ pbr::MessageDescriptor pb::IMessage.Descriptor {
+ get { return Descriptor; }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public DummyProtobufObject() {
+ OnConstruction();
+ }
+
+ partial void OnConstruction();
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public DummyProtobufObject(DummyProtobufObject other) : this() {
+ field1_ = other.field1_;
+ field2_ = other.field2_;
+ _unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public DummyProtobufObject Clone() {
+ return new DummyProtobufObject(this);
+ }
+
+ /// Field number for the "field1" field.
+ public const int Field1FieldNumber = 1;
+ private string field1_ = "";
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public string Field1 {
+ get { return field1_; }
+ set {
+ field1_ = pb::ProtoPreconditions.CheckNotNull(value, "value");
+ }
+ }
+
+ /// Field number for the "field2" field.
+ public const int Field2FieldNumber = 2;
+ private int field2_;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public int Field2 {
+ get { return field2_; }
+ set {
+ field2_ = value;
+ }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override bool Equals(object other) {
+ return Equals(other as DummyProtobufObject);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public bool Equals(DummyProtobufObject other) {
+ if (ReferenceEquals(other, null)) {
+ return false;
+ }
+ if (ReferenceEquals(other, this)) {
+ return true;
+ }
+ if (Field1 != other.Field1) return false;
+ if (Field2 != other.Field2) return false;
+ return Equals(_unknownFields, other._unknownFields);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override int GetHashCode() {
+ int hash = 1;
+ if (Field1.Length != 0) hash ^= Field1.GetHashCode();
+ if (Field2 != 0) hash ^= Field2.GetHashCode();
+ if (_unknownFields != null) {
+ hash ^= _unknownFields.GetHashCode();
+ }
+ return hash;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public override string ToString() {
+ return pb::JsonFormatter.ToDiagnosticString(this);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void WriteTo(pb::CodedOutputStream output) {
+ if (Field1.Length != 0) {
+ output.WriteRawTag(10);
+ output.WriteString(Field1);
+ }
+ if (Field2 != 0) {
+ output.WriteRawTag(16);
+ output.WriteInt32(Field2);
+ }
+ if (_unknownFields != null) {
+ _unknownFields.WriteTo(output);
+ }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public int CalculateSize() {
+ int size = 0;
+ if (Field1.Length != 0) {
+ size += 1 + pb::CodedOutputStream.ComputeStringSize(Field1);
+ }
+ if (Field2 != 0) {
+ size += 1 + pb::CodedOutputStream.ComputeInt32Size(Field2);
+ }
+ if (_unknownFields != null) {
+ size += _unknownFields.CalculateSize();
+ }
+ return size;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void MergeFrom(DummyProtobufObject other) {
+ if (other == null) {
+ return;
+ }
+ if (other.Field1.Length != 0) {
+ Field1 = other.Field1;
+ }
+ if (other.Field2 != 0) {
+ Field2 = other.Field2;
+ }
+ _unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ public void MergeFrom(pb::CodedInputStream input) {
+ uint tag;
+ while ((tag = input.ReadTag()) != 0) {
+ switch(tag) {
+ default:
+ _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input);
+ break;
+ case 10: {
+ Field1 = input.ReadString();
+ break;
+ }
+ case 16: {
+ Field2 = input.ReadInt32();
+ break;
+ }
+ }
+ }
+ }
+
+ }
+
+ #endregion
+
+}
+
+#endregion Designer generated code
diff --git a/src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.proto b/src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.proto
new file mode 100644
index 000000000..7acea2bdb
--- /dev/null
+++ b/src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.proto
@@ -0,0 +1,11 @@
+// can be generated using
+// protoc --csharp_out=. DummyProtobufObject.proto
+
+syntax = "proto3";
+
+package KafkaFlow.UnitTests;
+
+message DummyProtobufObject {
+ string field1 = 1;
+ int32 field2 = 2;
+}
diff --git a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj
index cfba1c2c7..9b1c45f82 100644
--- a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj
+++ b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj
@@ -32,7 +32,10 @@
+
+
+
diff --git a/src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentAvroTypeNameResolverTests.cs b/src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentAvroTypeNameResolverTests.cs
new file mode 100644
index 000000000..5fdeb0921
--- /dev/null
+++ b/src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentAvroTypeNameResolverTests.cs
@@ -0,0 +1,47 @@
+namespace KafkaFlow.UnitTests.Middlewares.Serialization
+{
+ using System.Threading.Tasks;
+ using Confluent.SchemaRegistry;
+ using FluentAssertions;
+ using KafkaFlow.Serializer.SchemaRegistry;
+ using Microsoft.VisualStudio.TestTools.UnitTesting;
+ using Moq;
+ using Newtonsoft.Json;
+
+ [TestClass]
+ public class ConfluentAvroTypeNameResolverTests
+ {
+ private readonly Mock schemaRegistryClient;
+ private readonly ConfluentAvroTypeNameResolver schemaRegistryTypeResolver;
+
+ public ConfluentAvroTypeNameResolverTests()
+ {
+ this.schemaRegistryClient = new Mock();
+ this.schemaRegistryTypeResolver = new ConfluentAvroTypeNameResolver(this.schemaRegistryClient.Object);
+ }
+
+ [TestMethod]
+ public async Task ResolveAsync_ValidSchemaObject_ReturnsAvroFieldsInCorrectFormat()
+ {
+ // Arrange
+ var schemaId = 420;
+ var type = typeof(ConfluentAvroTypeNameResolverTests);
+ var schemaObj = new
+ {
+ Name = type.Name,
+ NameSpace = type.Namespace,
+ };
+
+ var schema = new Schema(JsonConvert.SerializeObject(schemaObj), SchemaType.Avro);
+
+ this.schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, null))
+ .ReturnsAsync(schema);
+
+ // Act
+ var avroFields = await this.schemaRegistryTypeResolver.ResolveAsync(schemaId);
+
+ // Assert
+ avroFields.Should().Be($"{schemaObj.NameSpace}.{schemaObj.Name}");
+ }
+ }
+}
diff --git a/src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs b/src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs
new file mode 100644
index 000000000..dc23010c9
--- /dev/null
+++ b/src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs
@@ -0,0 +1,46 @@
+namespace KafkaFlow.UnitTests.Middlewares.Serialization
+{
+ using System;
+ using System.Threading.Tasks;
+ using Confluent.SchemaRegistry;
+ using Google.Protobuf;
+ using KafkaFlow.Serializer.SchemaRegistry;
+ using Microsoft.VisualStudio.TestTools.UnitTesting;
+ using Moq;
+
+ [TestClass]
+ public class ConfluentProtobufTypeNameResolverTests
+ {
+ private readonly Mock schemaRegistryClient;
+ private readonly ConfluentProtobufTypeNameResolver schemaRegistryTypeResolver;
+
+ public ConfluentProtobufTypeNameResolverTests()
+ {
+ this.schemaRegistryClient = new Mock();
+ this.schemaRegistryTypeResolver = new ConfluentProtobufTypeNameResolver(this.schemaRegistryClient.Object);
+ }
+
+ [TestMethod]
+ public async Task ResolveAsync_ValidProtobufObject_ReturnsProtoFields()
+ {
+ // Arrange
+ var schemaId = 420;
+
+ var dummyProtobufObj = new DummyProtobufObject
+ {
+ Field1 = "Field1",
+ Field2 = 8,
+ };
+ var base64Encoded = Convert.ToBase64String(dummyProtobufObj.ToByteArray());
+
+ this.schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, "serialized"))
+ .ReturnsAsync(new Schema(base64Encoded, SchemaType.Protobuf));
+
+ // Act
+ var protoFields = await this.schemaRegistryTypeResolver.ResolveAsync(schemaId);
+
+ // Assert
+ // TODO fix returning empty
+ }
+ }
+}
diff --git a/src/KafkaFlow.UnitTests/Middlewares/Serialization/SchemaRegistryTypeResolverTests.cs b/src/KafkaFlow.UnitTests/Middlewares/Serialization/SchemaRegistryTypeResolverTests.cs
new file mode 100644
index 000000000..dc079e896
--- /dev/null
+++ b/src/KafkaFlow.UnitTests/Middlewares/Serialization/SchemaRegistryTypeResolverTests.cs
@@ -0,0 +1,47 @@
+namespace KafkaFlow.UnitTests.Middlewares.Serialization
+{
+ using System;
+ using System.Buffers.Binary;
+ using System.Threading.Tasks;
+ using FluentAssertions;
+ using Microsoft.VisualStudio.TestTools.UnitTesting;
+ using Moq;
+
+ [TestClass]
+ public class SchemaRegistryTypeResolverTests
+ {
+ private readonly Mock messageContextMock;
+ private readonly Mock schemaRegistryTypeNameResolverMock;
+ private readonly SchemaRegistryTypeResolver schemaRegistryTypeResolver;
+ private readonly byte[] messageKey = new byte[] { 0x18, 0x19 };
+ private readonly byte[] messageValue = new byte[] { 0x20, 0x21, 0x22, 0x23, 0x24, 0x25 };
+
+ public SchemaRegistryTypeResolverTests()
+ {
+ this.messageContextMock = new Mock();
+ this.messageContextMock.Setup(context => context.Message).Returns(new Message(messageKey, messageValue));
+ this.schemaRegistryTypeNameResolverMock = new Mock();
+ this.schemaRegistryTypeResolver = new SchemaRegistryTypeResolver(this.schemaRegistryTypeNameResolverMock.Object);
+ }
+
+ [TestMethod]
+ public async Task OnConsumeAsync_WhenCalledTwice_TypeIsResolvedOnceThenTypeIsLoadedFromCache()
+ {
+ // Arrange
+ var expectedSchemaId = BinaryPrimitives.ReadInt32BigEndian(
+ ((byte[]) this.messageValue).AsSpan().Slice(1, 4));
+
+ this.schemaRegistryTypeNameResolverMock.Setup(
+ resolver => resolver.ResolveAsync(expectedSchemaId)).ReturnsAsync(typeof(SchemaRegistryTypeResolverTests).FullName);
+
+ // Act
+ await this.schemaRegistryTypeResolver.OnConsumeAsync(messageContextMock.Object);
+ var type = await this.schemaRegistryTypeResolver.OnConsumeAsync(messageContextMock.Object);
+
+ // Assert
+ this.schemaRegistryTypeNameResolverMock.Verify(resolver => resolver.ResolveAsync(expectedSchemaId), Times.Once);
+ var expectedObject = (SchemaRegistryTypeResolverTests)Activator.CreateInstance(type);
+ expectedObject.Should().NotBeNull();
+ }
+ }
+}