Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quick refact moving from Newtonsoft to System.text #2321

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions src/Confluent.SchemaRegistry.Serdes.Json/JsonDeserializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
using System.Linq;
using System.Net;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Confluent.Kafka;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NJsonSchema;
using NJsonSchema.Generation;
using NJsonSchema.Validation;
Expand Down Expand Up @@ -55,6 +54,8 @@ public class JsonDeserializer<T> : AsyncDeserializer<T, JsonSchema> where T : cl
{
private readonly JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings;

private readonly JsonSerializerOptions jsonSerializerOptions;

private JsonSchemaValidator validator = new JsonSchemaValidator();

private JsonSchema schema = null;
Expand Down Expand Up @@ -84,6 +85,7 @@ public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, JsonDeserial
: base(schemaRegistryClient, config, ruleRegistry)
{
this.jsonSchemaGeneratorSettings = jsonSchemaGeneratorSettings;
this.jsonSerializerOptions = JsonUtils.SettingsMapping(jsonSchemaGeneratorSettings);

if (config == null) { return; }

Expand Down Expand Up @@ -127,6 +129,7 @@ public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, Schema schem
schemaRegistryClient, schema, this.jsonSchemaGeneratorSettings);
JsonSchema jsonSchema = utils.GetResolvedSchema().Result;
this.schema = jsonSchema;
this.jsonSerializerOptions = JsonUtils.SettingsMapping(jsonSchemaGeneratorSettings);
}

/// <summary>
Expand Down Expand Up @@ -196,7 +199,7 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
}
}
}

if (latestSchema != null)
{
migrations = await GetMigrations(subject, writerSchema, latestSchema)
Expand All @@ -208,14 +211,16 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
using (var jsonStream = new MemoryStream(array, headerSize, array.Length - headerSize))
using (var jsonReader = new StreamReader(jsonStream, Encoding.UTF8))
{
JToken json = Newtonsoft.Json.JsonConvert.DeserializeObject<JToken>(jsonReader.ReadToEnd(), this.jsonSchemaGeneratorSettings?.ActualSerializerSettings);
json = await ExecuteMigrations(migrations, isKey, subject, topic, context.Headers, json)
.ContinueWith(t => (JToken)t.Result)
string jsonString = await jsonReader.ReadToEndAsync();
using JsonDocument jsonDocument = JsonDocument.Parse(jsonString);
JsonElement jsonElement = jsonDocument.RootElement;
jsonElement = (JsonElement)await ExecuteMigrations(migrations, isKey, subject, topic, context.Headers, jsonElement)
.ContinueWith(t => t.Result)
.ConfigureAwait(continueOnCapturedContext: false);

if (schema != null)
{
var validationResult = validator.Validate(json, schema);
var validationResult = validator.Validate(jsonElement.GetRawText(), schema);

if (validationResult.Count > 0)
{
Expand All @@ -224,7 +229,7 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
}
}

value = json.ToObject<T>(JsonSerializer.Create());
value = jsonElement.Deserialize<T>(this.jsonSerializerOptions); // TODO not sure why we need this here
}
}
else
Expand All @@ -233,7 +238,7 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
using (var jsonReader = new StreamReader(jsonStream, Encoding.UTF8))
{
string serializedString = jsonReader.ReadToEnd();

if (schema != null)
{
var validationResult = validator.Validate(serializedString, schema);
Expand All @@ -245,7 +250,7 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
}
}

value = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(serializedString, this.jsonSchemaGeneratorSettings?.ActualSerializerSettings);
value = System.Text.Json.JsonSerializer.Deserialize<T>(serializedString, this.jsonSerializerOptions);
}
}

Expand All @@ -263,8 +268,8 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
writerSchema, value, fieldTransformer)
.ContinueWith(t => (T)t.Result)
.ConfigureAwait(continueOnCapturedContext: false);
}
}

return value;
}
catch (AggregateException e)
Expand All @@ -277,7 +282,7 @@ protected override async Task<JsonSchema> ParseSchema(Schema schema)
{
JsonSchemaResolver utils = new JsonSchemaResolver(
schemaRegistryClient, schema, jsonSchemaGeneratorSettings);

return await utils.GetResolvedSchema();
}
}
Expand Down
19 changes: 12 additions & 7 deletions src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using System.IO;
using System.Linq;
using System.Net;
using System.Text.Json;
using System.Threading.Tasks;
using NJsonSchema;
using NJsonSchema.Generation;
Expand Down Expand Up @@ -55,8 +56,10 @@ namespace Confluent.SchemaRegistry.Serdes
public class JsonSerializer<T> : AsyncSerializer<T, JsonSchema> where T : class
{
private readonly JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings;

private readonly JsonSerializerOptions jsonSerializerOptions;
private readonly List<SchemaReference> ReferenceList = new List<SchemaReference>();

private JsonSchemaValidator validator = new JsonSchemaValidator();

/// <remarks>
Expand All @@ -81,18 +84,19 @@ public class JsonSerializer<T> : AsyncSerializer<T, JsonSchema> where T : class
/// <param name="jsonSchemaGeneratorSettings">
/// JSON schema generator settings.
/// </param>
public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializerConfig config = null,
public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializerConfig config = null,
JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings = null, RuleRegistry ruleRegistry = null)
: base(schemaRegistryClient, config, ruleRegistry)
{
this.jsonSchemaGeneratorSettings = jsonSchemaGeneratorSettings;
this.jsonSerializerOptions = JsonUtils.SettingsMapping(jsonSchemaGeneratorSettings);

this.schema = this.jsonSchemaGeneratorSettings == null
? JsonSchema.FromType<T>()
: JsonSchema.FromType<T>(this.jsonSchemaGeneratorSettings);
this.schemaText = schema.ToJson();
this.schemaFullname = schema.Title;

if (config == null) { return; }

var nonJsonConfig = config
Expand Down Expand Up @@ -135,7 +139,7 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializer
/// <param name="jsonSchemaGeneratorSettings">
/// JSON schema generator settings.
/// </param>
public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, Schema schema, JsonSerializerConfig config = null,
public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, Schema schema, JsonSerializerConfig config = null,
JsonSchemaGeneratorSettings jsonSchemaGeneratorSettings = null, RuleRegistry ruleRegistry = null)
: this(schemaRegistryClient, config, jsonSchemaGeneratorSettings, ruleRegistry)
{
Expand All @@ -150,6 +154,7 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, Schema schema,
this.schema = jsonSchema;
this.schemaText = schema.SchemaString;
this.schemaFullname = jsonSchema.Title;
this.jsonSerializerOptions = JsonUtils.SettingsMapping(jsonSchemaGeneratorSettings);
}

/// <summary>
Expand Down Expand Up @@ -186,7 +191,7 @@ public override async Task<byte[]> SerializeAsync(T value, SerializationContext
subject = GetSubjectName(context.Topic, context.Component == MessageComponentType.Key, this.schemaFullname);
latestSchema = await GetReaderSchema(subject, new Schema(schemaText, ReferenceList, SchemaType.Json))
.ConfigureAwait(continueOnCapturedContext: false);

if (!subjectsRegistered.Contains(subject))
{
if (latestSchema != null)
Expand All @@ -211,7 +216,7 @@ public override async Task<byte[]> SerializeAsync(T value, SerializationContext
{
serdeMutex.Release();
}

if (latestSchema != null)
{
var latestSchemaJson = await GetParsedSchema(latestSchema).ConfigureAwait(false);
Expand All @@ -226,7 +231,7 @@ public override async Task<byte[]> SerializeAsync(T value, SerializationContext
.ConfigureAwait(continueOnCapturedContext: false);
}

var serializedString = Newtonsoft.Json.JsonConvert.SerializeObject(value, this.jsonSchemaGeneratorSettings?.ActualSerializerSettings);
var serializedString = JsonSerializer.Serialize(value, this.jsonSerializerOptions);
var validationResult = validator.Validate(serializedString, this.schema);
if (validationResult.Count > 0)
{
Expand Down
43 changes: 33 additions & 10 deletions src/Confluent.SchemaRegistry.Serdes.Json/JsonUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text.Json;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Serialization;
using NJsonSchema;
using NJsonSchema.Validation;
using NJsonSchema.Generation;
using Microsoft.Extensions.Options;
using System.Text.Json.Serialization;


namespace Confluent.SchemaRegistry.Serdes
Expand All @@ -41,13 +45,13 @@ public static async Task<object> Transform(RuleContext ctx, JsonSchema schema, s
{
return message;
}

RuleContext.FieldContext fieldContext = ctx.CurrentField();
if (fieldContext != null)
{
fieldContext.Type = GetType(schema);
}

if (schema.AllOf.Count > 0 || schema.AnyOf.Count > 0 || schema.OneOf.Count > 0)
{
JToken jsonObject = JToken.FromObject(message);
Expand All @@ -65,9 +69,9 @@ public static async Task<object> Transform(RuleContext ctx, JsonSchema schema, s
}
else if (schema.IsArray)
{
bool isList = typeof(IList).IsAssignableFrom(message.GetType())
|| (message.GetType().IsGenericType
&& (message.GetType().GetGenericTypeDefinition() == typeof(List<>)
bool isList = typeof(IList).IsAssignableFrom(message.GetType())
|| (message.GetType().IsGenericType
&& (message.GetType().GetGenericTypeDefinition() == typeof(List<>)
|| message.GetType().GetGenericTypeDefinition() == typeof(IList<>)));
if (!isList)
{
Expand Down Expand Up @@ -125,7 +129,7 @@ public static async Task<object> Transform(RuleContext ctx, JsonSchema schema, s
ISet<string> ruleTags = ctx.Rule.Tags ?? new HashSet<string>();
ISet<string> intersect = new HashSet<string>(fieldContext.Tags);
intersect.IntersectWith(ruleTags);

if (ruleTags.Count == 0 || intersect.Count != 0)
{
return await fieldTransform.Transform(ctx, fieldContext, message)
Expand Down Expand Up @@ -191,7 +195,7 @@ public FieldAccessor(Type type, string fieldName)
SetValue = (instance, value) => propertyInfo.SetValue(instance, value);
return;
}

var fieldInfo = type.GetField(fieldName,
BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance);
if (fieldInfo != null)
Expand All @@ -200,7 +204,7 @@ public FieldAccessor(Type type, string fieldName)
SetValue = (instance, value) => fieldInfo.SetValue(instance, value);
return;
}

foreach (PropertyInfo prop in type.GetProperties())
{
if (prop.IsDefined(typeof(JsonPropertyAttribute)))
Expand All @@ -217,7 +221,7 @@ public FieldAccessor(Type type, string fieldName)
}
}
}

foreach (FieldInfo field in type.GetFields())
{
if (field.IsDefined(typeof(JsonPropertyAttribute)))
Expand All @@ -234,7 +238,7 @@ public FieldAccessor(Type type, string fieldName)
}
}
}

throw new ArgumentException("Could not find field " + fieldName);
}

Expand All @@ -248,5 +252,24 @@ public void SetFieldValue(object message, object value)
SetValue(message, value);
}
}

public static JsonSerializerOptions SettingsMapping(JsonSchemaGeneratorSettings settings)
{ //TODO mapping settings using the jsonSchemaGeneratorSettings object

var options = new JsonSerializerOptions();

// if (settings.SerializerSettings.ContractResolver is DefaultContractResolver contractResolver)
// {
// if (contractResolver.NamingStrategy is CamelCaseNamingStrategy)
// {
// options.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
// }
// }

options.PropertyNameCaseInsensitive = true;
options.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;

return options;
}
}
}