From 2bba5020bc466895030e05c5aa709062a9fdb2e3 Mon Sep 17 00:00:00 2001 From: Davide Armand Date: Wed, 30 Oct 2024 20:37:11 +0100 Subject: [PATCH] avro: merge references using avro lib (poc) Instead of using avro unions, use avro lib (function `make_avsc_object()`) to merge together reference schemas and main schema. This is just a proof of concept, it does not support indirect references for example (schema1 --> schema2 --> schema3). --- src/karapace/schema_models.py | 36 ++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/src/karapace/schema_models.py b/src/karapace/schema_models.py index b56fcc123..7f57dc6fd 100644 --- a/src/karapace/schema_models.py +++ b/src/karapace/schema_models.py @@ -5,7 +5,8 @@ from __future__ import annotations from avro.errors import SchemaParseException -from avro.schema import parse as avro_parse, Schema as AvroSchema +from avro.name import Names as AvroNames +from avro.schema import make_avsc_object, parse as avro_parse, Schema as AvroSchema from collections.abc import Collection, Mapping, Sequence from dataclasses import dataclass from jsonschema import Draft7Validator @@ -29,8 +30,8 @@ from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError from typing import Any, cast, Final, final -import avro.schema import hashlib +import json import logging import re @@ -254,22 +255,31 @@ def parse( if schema_type is SchemaType.AVRO: try: if dependencies: - wrapped_schema_str = AvroMerge(schema_str, dependencies).wrap() + names = AvroNames(validate_names=True) + deps = list(dependencies.values()) + + merged_schema = None + for dep in deps: + # Merge dep with all previously merged ones + json_dep = dep.get_schema().to_dict() + merged_schema = make_avsc_object(json_dep, names) + + # TODO: recursively add the dependencies of this dependency, so that indirect dependencies are + # working (schema1 --> schema2 --> schema3) + + # Merge main schema with all dependencies + schema_json = json.loads(schema_str) + merged_schema = make_avsc_object(schema_json, names) + + merged_schema_str = str(merged_schema) else: - wrapped_schema_str = schema_str + merged_schema_str = schema_str parsed_schema = parse_avro_schema_definition( - wrapped_schema_str, + merged_schema_str, validate_enum_symbols=validate_avro_enum_symbols, validate_names=validate_avro_names, ) - if dependencies: - if isinstance(parsed_schema, avro.schema.UnionSchema): - parsed_schema_result = parsed_schema.schemas[-1].fields[0].type.schemas[-1] - - else: - raise InvalidSchema - else: - parsed_schema_result = parsed_schema + parsed_schema_result = parsed_schema return ParsedTypedSchema( schema_type=schema_type, schema_str=schema_str,