Skip to content

Commit

Permalink
feat: add support for csharp namespace in protobuf schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
esskar committed Apr 17, 2024
1 parent 16f7851 commit 9ac6830
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ public async Task<string> ResolveAsync(int id)
var schemaString = (await _client.GetSchemaAsync(id, "serialized")).SchemaString;

var protoFields = FileDescriptorProto.Parser.ParseFrom(ByteString.FromBase64(schemaString));

return $"{protoFields.Package}.{protoFields.MessageType.FirstOrDefault()?.Name}";
var messageType = protoFields.MessageType.FirstOrDefault()?.Name;
var ns = protoFields.Options?.HasCsharpNamespace == true
? protoFields.Options.CsharpNamespace
: protoFields.Package;
return BuildTypeName(messageType, ns);
}
}

private static string BuildTypeName(string messageType, string ns)
=> string.IsNullOrEmpty(ns) ? messageType ?? string.Empty : $"{ns}.{messageType}";
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Confluent.SchemaRegistry;

Expand Down Expand Up @@ -44,4 +45,58 @@ public async Task ResolveAsync_ValidProtobufObject_ReturnsProtoFields()
// Assert
protoFields.Should().NotBeNull();
}

[TestMethod]
public async Task ResolveAsync_SchemaWithPackageOnly_ReturnsTypeNameWithPackageNamespace()
{
// Arrange
// below schema-string is base64 encoded protobuf schema of 'syntax = \"proto3\";\npackage kafkaflow.test;\n\nmessage Person {\n string name = 1;\n}\n'
var schemaString = "CgdkZWZhdWx0Eg5rYWZrYWZsb3cudGVzdCIUCgZQZXJzb24SCgoEbmFtZRgBKAliBnByb3RvMw==";
var schemaId = 420;

_schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, "serialized"))
.ReturnsAsync(new RegisteredSchema("test", 1, schemaId, schemaString, SchemaType.Protobuf, new List<SchemaReference>()));

// Act
var actual = await _schemaRegistryTypeResolver.ResolveAsync(schemaId);

// Assert
actual.Should().Be("kafkaflow.test.Person");
}

[TestMethod]
public async Task ResolveAsync_SchemaWithCsharpNamespace_ReturnsTypeNameWithCsharpNamespace()
{
// Arrange
// below schema-string is base64 encoded protobuf schema of 'syntax = \"proto3\";\npackage kafkaflow.test;\n\nmessage Person {\n string name = 1;\n}\n'
var schemaString = "CgdkZWZhdWx0Eg5rYWZrYWZsb3cudGVzdCIUCgZQZXJzb24SCgoEbmFtZRgBKAlCEaoCDkthZmthRmxvdy5UZXN0YgZwcm90bzM=";
var schemaId = 420;

_schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, "serialized"))
.ReturnsAsync(new RegisteredSchema("test", 1, schemaId, schemaString, SchemaType.Protobuf, new List<SchemaReference>()));

// Act
var actual = await _schemaRegistryTypeResolver.ResolveAsync(schemaId);

// Assert
Assert.AreEqual("KafkaFlow.Test.Person", actual);
}

[TestMethod]
public async Task ResolveAsync_SchemaWithoutPackageOrNamespace_ReturnsTypeNameWithoutNamespace()
{
// Arrange
// below schema-string is base64 encoded protobuf schema of 'syntax = \"proto3\";\n\nmessage Person {\n string name = 1;\n}\n'
var schemaString = "CgdkZWZhdWx0IhQKBlBlcnNvbhIKCgRuYW1lGAEoCWIGcHJvdG8z";
var schemaId = 420;

_schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, "serialized"))
.ReturnsAsync(new RegisteredSchema("test", 1, schemaId, schemaString, SchemaType.Protobuf, new List<SchemaReference>()));

// Act
var actual = await _schemaRegistryTypeResolver.ResolveAsync(schemaId);

// Assert
Assert.AreEqual("Person", actual);
}
}

0 comments on commit 9ac6830

Please sign in to comment.