Skip to content

Commit

Permalink
avro: merge references using avro lib (poc)
Browse files Browse the repository at this point in the history
Instead of using avro unions, use avro lib (function `make_avsc_object()`) to
merge together reference schemas and main schema.
This is just a proof of concept, it does not support indirect references
for example (schema1 --> schema2 --> schema3).
  • Loading branch information
davide-armand committed Oct 31, 2024
1 parent 445519e commit 2bba502
Showing 1 changed file with 23 additions and 13 deletions.
36 changes: 23 additions & 13 deletions src/karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from __future__ import annotations

from avro.errors import SchemaParseException
from avro.schema import parse as avro_parse, Schema as AvroSchema
from avro.name import Names as AvroNames
from avro.schema import make_avsc_object, parse as avro_parse, Schema as AvroSchema
from collections.abc import Collection, Mapping, Sequence
from dataclasses import dataclass
from jsonschema import Draft7Validator
Expand All @@ -29,8 +30,8 @@
from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError
from typing import Any, cast, Final, final

import avro.schema
import hashlib
import json
import logging
import re

Expand Down Expand Up @@ -254,22 +255,31 @@ def parse(
if schema_type is SchemaType.AVRO:
try:
if dependencies:
wrapped_schema_str = AvroMerge(schema_str, dependencies).wrap()
names = AvroNames(validate_names=True)
deps = list(dependencies.values())

merged_schema = None
for dep in deps:
# Merge dep with all previously merged ones
json_dep = dep.get_schema().to_dict()
merged_schema = make_avsc_object(json_dep, names)

# TODO: recursively add the dependencies of this dependency, so that indirect dependencies are
# working (schema1 --> schema2 --> schema3)

# Merge main schema with all dependencies
schema_json = json.loads(schema_str)
merged_schema = make_avsc_object(schema_json, names)

merged_schema_str = str(merged_schema)
else:
wrapped_schema_str = schema_str
merged_schema_str = schema_str
parsed_schema = parse_avro_schema_definition(
wrapped_schema_str,
merged_schema_str,
validate_enum_symbols=validate_avro_enum_symbols,
validate_names=validate_avro_names,
)
if dependencies:
if isinstance(parsed_schema, avro.schema.UnionSchema):
parsed_schema_result = parsed_schema.schemas[-1].fields[0].type.schemas[-1]

else:
raise InvalidSchema
else:
parsed_schema_result = parsed_schema
parsed_schema_result = parsed_schema
return ParsedTypedSchema(
schema_type=schema_type,
schema_str=schema_str,
Expand Down

0 comments on commit 2bba502

Please sign in to comment.