-
Notifications
You must be signed in to change notification settings - Fork 119
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
9ed6bd9
commit 684338e
Showing
8 changed files
with
361 additions
and
0 deletions.
There are no files selected for viewing
3 changes: 3 additions & 0 deletions
3
src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/AssemblyInfo.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
using System.Runtime.CompilerServices; | ||
|
||
[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")] |
3 changes: 3 additions & 0 deletions
3
src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
using System.Runtime.CompilerServices; | ||
|
||
[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")] |
201 changes: 201 additions & 0 deletions
201
src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,201 @@ | ||
// <auto-generated> | ||
// Generated by the protocol buffer compiler. DO NOT EDIT! | ||
// source: DummyProtobufObject.proto | ||
// </auto-generated> | ||
#pragma warning disable 1591, 0612, 3021 | ||
#region Designer generated code | ||
|
||
using pb = global::Google.Protobuf; | ||
using pbc = global::Google.Protobuf.Collections; | ||
using pbr = global::Google.Protobuf.Reflection; | ||
using scg = global::System.Collections.Generic; | ||
namespace KafkaFlow.UnitTests { | ||
|
||
/// <summary>Holder for reflection information generated from DummyProtobufObject.proto</summary> | ||
public static partial class DummyProtobufObjectReflection { | ||
|
||
#region Descriptor | ||
/// <summary>File descriptor for DummyProtobufObject.proto</summary> | ||
public static pbr::FileDescriptor Descriptor { | ||
get { return descriptor; } | ||
} | ||
private static pbr::FileDescriptor descriptor; | ||
|
||
static DummyProtobufObjectReflection() { | ||
byte[] descriptorData = global::System.Convert.FromBase64String( | ||
string.Concat( | ||
"ChlEdW1teVByb3RvYnVmT2JqZWN0LnByb3RvEhNLYWZrYUZsb3cuVW5pdFRl", | ||
"c3RzIjUKE0R1bW15UHJvdG9idWZPYmplY3QSDgoGZmllbGQxGAEgASgJEg4K", | ||
"BmZpZWxkMhgCIAEoBWIGcHJvdG8z")); | ||
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, | ||
new pbr::FileDescriptor[] { }, | ||
new pbr::GeneratedClrTypeInfo(null, null, new pbr::GeneratedClrTypeInfo[] { | ||
new pbr::GeneratedClrTypeInfo(typeof(global::KafkaFlow.UnitTests.DummyProtobufObject), global::KafkaFlow.UnitTests.DummyProtobufObject.Parser, new[]{ "Field1", "Field2" }, null, null, null, null) | ||
})); | ||
} | ||
#endregion | ||
|
||
} | ||
#region Messages | ||
public sealed partial class DummyProtobufObject : pb::IMessage<DummyProtobufObject> { | ||
private static readonly pb::MessageParser<DummyProtobufObject> _parser = new pb::MessageParser<DummyProtobufObject>(() => new DummyProtobufObject()); | ||
private pb::UnknownFieldSet _unknownFields; | ||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public static pb::MessageParser<DummyProtobufObject> Parser { get { return _parser; } } | ||
|
||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public static pbr::MessageDescriptor Descriptor { | ||
get { return global::KafkaFlow.UnitTests.DummyProtobufObjectReflection.Descriptor.MessageTypes[0]; } | ||
} | ||
|
||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
pbr::MessageDescriptor pb::IMessage.Descriptor { | ||
get { return Descriptor; } | ||
} | ||
|
||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public DummyProtobufObject() { | ||
OnConstruction(); | ||
} | ||
|
||
partial void OnConstruction(); | ||
|
||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public DummyProtobufObject(DummyProtobufObject other) : this() { | ||
field1_ = other.field1_; | ||
field2_ = other.field2_; | ||
_unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields); | ||
} | ||
|
||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public DummyProtobufObject Clone() { | ||
return new DummyProtobufObject(this); | ||
} | ||
|
||
/// <summary>Field number for the "field1" field.</summary> | ||
public const int Field1FieldNumber = 1; | ||
private string field1_ = ""; | ||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public string Field1 { | ||
get { return field1_; } | ||
set { | ||
field1_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); | ||
} | ||
} | ||
|
||
/// <summary>Field number for the "field2" field.</summary> | ||
public const int Field2FieldNumber = 2; | ||
private int field2_; | ||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public int Field2 { | ||
get { return field2_; } | ||
set { | ||
field2_ = value; | ||
} | ||
} | ||
|
||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public override bool Equals(object other) { | ||
return Equals(other as DummyProtobufObject); | ||
} | ||
|
||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public bool Equals(DummyProtobufObject other) { | ||
if (ReferenceEquals(other, null)) { | ||
return false; | ||
} | ||
if (ReferenceEquals(other, this)) { | ||
return true; | ||
} | ||
if (Field1 != other.Field1) return false; | ||
if (Field2 != other.Field2) return false; | ||
return Equals(_unknownFields, other._unknownFields); | ||
} | ||
|
||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public override int GetHashCode() { | ||
int hash = 1; | ||
if (Field1.Length != 0) hash ^= Field1.GetHashCode(); | ||
if (Field2 != 0) hash ^= Field2.GetHashCode(); | ||
if (_unknownFields != null) { | ||
hash ^= _unknownFields.GetHashCode(); | ||
} | ||
return hash; | ||
} | ||
|
||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public override string ToString() { | ||
return pb::JsonFormatter.ToDiagnosticString(this); | ||
} | ||
|
||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public void WriteTo(pb::CodedOutputStream output) { | ||
if (Field1.Length != 0) { | ||
output.WriteRawTag(10); | ||
output.WriteString(Field1); | ||
} | ||
if (Field2 != 0) { | ||
output.WriteRawTag(16); | ||
output.WriteInt32(Field2); | ||
} | ||
if (_unknownFields != null) { | ||
_unknownFields.WriteTo(output); | ||
} | ||
} | ||
|
||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public int CalculateSize() { | ||
int size = 0; | ||
if (Field1.Length != 0) { | ||
size += 1 + pb::CodedOutputStream.ComputeStringSize(Field1); | ||
} | ||
if (Field2 != 0) { | ||
size += 1 + pb::CodedOutputStream.ComputeInt32Size(Field2); | ||
} | ||
if (_unknownFields != null) { | ||
size += _unknownFields.CalculateSize(); | ||
} | ||
return size; | ||
} | ||
|
||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public void MergeFrom(DummyProtobufObject other) { | ||
if (other == null) { | ||
return; | ||
} | ||
if (other.Field1.Length != 0) { | ||
Field1 = other.Field1; | ||
} | ||
if (other.Field2 != 0) { | ||
Field2 = other.Field2; | ||
} | ||
_unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields); | ||
} | ||
|
||
[global::System.Diagnostics.DebuggerNonUserCodeAttribute] | ||
public void MergeFrom(pb::CodedInputStream input) { | ||
uint tag; | ||
while ((tag = input.ReadTag()) != 0) { | ||
switch(tag) { | ||
default: | ||
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input); | ||
break; | ||
case 10: { | ||
Field1 = input.ReadString(); | ||
break; | ||
} | ||
case 16: { | ||
Field2 = input.ReadInt32(); | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
|
||
} | ||
|
||
#endregion | ||
|
||
} | ||
|
||
#endregion Designer generated code |
11 changes: 11 additions & 0 deletions
11
src/KafkaFlow.UnitTests/DummyObjects/DummyProtobufObject.proto
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// can be generated using | ||
// protoc --csharp_out=. DummyProtobufObject.proto | ||
|
||
syntax = "proto3"; | ||
|
||
package KafkaFlow.UnitTests; | ||
|
||
message DummyProtobufObject { | ||
string field1 = 1; | ||
int32 field2 = 2; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
47 changes: 47 additions & 0 deletions
47
src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentAvroTypeNameResolverTests.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
namespace KafkaFlow.UnitTests.Middlewares.Serialization | ||
{ | ||
using System.Threading.Tasks; | ||
using Confluent.SchemaRegistry; | ||
using FluentAssertions; | ||
using KafkaFlow.Serializer.SchemaRegistry; | ||
using Microsoft.VisualStudio.TestTools.UnitTesting; | ||
using Moq; | ||
using Newtonsoft.Json; | ||
|
||
[TestClass] | ||
public class ConfluentAvroTypeNameResolverTests | ||
{ | ||
private readonly Mock<ISchemaRegistryClient> schemaRegistryClient; | ||
private readonly ConfluentAvroTypeNameResolver schemaRegistryTypeResolver; | ||
|
||
public ConfluentAvroTypeNameResolverTests() | ||
{ | ||
this.schemaRegistryClient = new Mock<ISchemaRegistryClient>(); | ||
this.schemaRegistryTypeResolver = new ConfluentAvroTypeNameResolver(this.schemaRegistryClient.Object); | ||
} | ||
|
||
[TestMethod] | ||
public async Task ResolveAsync_ValidSchemaObject_ReturnsAvroFieldsInCorrectFormat() | ||
{ | ||
// Arrange | ||
var schemaId = 420; | ||
var type = typeof(ConfluentAvroTypeNameResolverTests); | ||
var schemaObj = new | ||
{ | ||
Name = type.Name, | ||
NameSpace = type.Namespace, | ||
}; | ||
|
||
var schema = new Schema(JsonConvert.SerializeObject(schemaObj), SchemaType.Avro); | ||
|
||
this.schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, null)) | ||
.ReturnsAsync(schema); | ||
|
||
// Act | ||
var avroFields = await this.schemaRegistryTypeResolver.ResolveAsync(schemaId); | ||
|
||
// Assert | ||
avroFields.Should().Be($"{schemaObj.NameSpace}.{schemaObj.Name}"); | ||
} | ||
} | ||
} |
46 changes: 46 additions & 0 deletions
46
src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
namespace KafkaFlow.UnitTests.Middlewares.Serialization | ||
{ | ||
using System; | ||
using System.Threading.Tasks; | ||
using Confluent.SchemaRegistry; | ||
using Google.Protobuf; | ||
using KafkaFlow.Serializer.SchemaRegistry; | ||
using Microsoft.VisualStudio.TestTools.UnitTesting; | ||
using Moq; | ||
|
||
[TestClass] | ||
public class ConfluentProtobufTypeNameResolverTests | ||
{ | ||
private readonly Mock<ISchemaRegistryClient> schemaRegistryClient; | ||
private readonly ConfluentProtobufTypeNameResolver schemaRegistryTypeResolver; | ||
|
||
public ConfluentProtobufTypeNameResolverTests() | ||
{ | ||
this.schemaRegistryClient = new Mock<ISchemaRegistryClient>(); | ||
this.schemaRegistryTypeResolver = new ConfluentProtobufTypeNameResolver(this.schemaRegistryClient.Object); | ||
} | ||
|
||
[TestMethod] | ||
public async Task ResolveAsync_ValidProtobufObject_ReturnsProtoFields() | ||
{ | ||
// Arrange | ||
var schemaId = 420; | ||
|
||
var dummyProtobufObj = new DummyProtobufObject | ||
{ | ||
Field1 = "Field1", | ||
Field2 = 8, | ||
}; | ||
var base64Encoded = Convert.ToBase64String(dummyProtobufObj.ToByteArray()); | ||
|
||
this.schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, "serialized")) | ||
.ReturnsAsync(new Schema(base64Encoded, SchemaType.Protobuf)); | ||
|
||
// Act | ||
var protoFields = await this.schemaRegistryTypeResolver.ResolveAsync(schemaId); | ||
Check notice on line 40 in src/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs Codacy Production / Codacy Static Code Analysissrc/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs#L40
|
||
|
||
// Assert | ||
// TODO fix returning empty | ||
} | ||
} | ||
} |
47 changes: 47 additions & 0 deletions
47
src/KafkaFlow.UnitTests/Middlewares/Serialization/SchemaRegistryTypeResolverTests.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
namespace KafkaFlow.UnitTests.Middlewares.Serialization | ||
{ | ||
using System; | ||
using System.Buffers.Binary; | ||
using System.Threading.Tasks; | ||
using FluentAssertions; | ||
using Microsoft.VisualStudio.TestTools.UnitTesting; | ||
using Moq; | ||
|
||
[TestClass] | ||
public class SchemaRegistryTypeResolverTests | ||
{ | ||
private readonly Mock<IMessageContext> messageContextMock; | ||
private readonly Mock<ISchemaRegistryTypeNameResolver> schemaRegistryTypeNameResolverMock; | ||
private readonly SchemaRegistryTypeResolver schemaRegistryTypeResolver; | ||
private readonly byte[] messageKey = new byte[] { 0x18, 0x19 }; | ||
private readonly byte[] messageValue = new byte[] { 0x20, 0x21, 0x22, 0x23, 0x24, 0x25 }; | ||
|
||
public SchemaRegistryTypeResolverTests() | ||
{ | ||
this.messageContextMock = new Mock<IMessageContext>(); | ||
this.messageContextMock.Setup(context => context.Message).Returns(new Message(messageKey, messageValue)); | ||
this.schemaRegistryTypeNameResolverMock = new Mock<ISchemaRegistryTypeNameResolver>(); | ||
this.schemaRegistryTypeResolver = new SchemaRegistryTypeResolver(this.schemaRegistryTypeNameResolverMock.Object); | ||
} | ||
|
||
[TestMethod] | ||
public async Task OnConsumeAsync_WhenCalledTwice_TypeIsResolvedOnceThenTypeIsLoadedFromCache() | ||
{ | ||
// Arrange | ||
var expectedSchemaId = BinaryPrimitives.ReadInt32BigEndian( | ||
((byte[]) this.messageValue).AsSpan().Slice(1, 4)); | ||
Check notice on line 32 in src/KafkaFlow.UnitTests/Middlewares/Serialization/SchemaRegistryTypeResolverTests.cs Codacy Production / Codacy Static Code Analysissrc/KafkaFlow.UnitTests/Middlewares/Serialization/SchemaRegistryTypeResolverTests.cs#L32
|
||
|
||
this.schemaRegistryTypeNameResolverMock.Setup( | ||
resolver => resolver.ResolveAsync(expectedSchemaId)).ReturnsAsync(typeof(SchemaRegistryTypeResolverTests).FullName); | ||
|
||
// Act | ||
await this.schemaRegistryTypeResolver.OnConsumeAsync(messageContextMock.Object); | ||
var type = await this.schemaRegistryTypeResolver.OnConsumeAsync(messageContextMock.Object); | ||
|
||
// Assert | ||
this.schemaRegistryTypeNameResolverMock.Verify(resolver => resolver.ResolveAsync(expectedSchemaId), Times.Once); | ||
var expectedObject = (SchemaRegistryTypeResolverTests)Activator.CreateInstance(type); | ||
expectedObject.Should().NotBeNull(); | ||
} | ||
} | ||
} |