Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro references poc implementation using avro lib #987

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions src/karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from __future__ import annotations

from avro.errors import SchemaParseException
from avro.schema import parse as avro_parse, Schema as AvroSchema
from avro.name import Names as AvroNames
from avro.schema import make_avsc_object, parse as avro_parse, Schema as AvroSchema
from collections.abc import Collection, Mapping, Sequence
from dataclasses import dataclass
from jsonschema import Draft7Validator
Expand All @@ -29,8 +30,8 @@
from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError
from typing import Any, cast, Final, final

import avro.schema
import hashlib
import json
import logging
import re

Expand Down Expand Up @@ -254,22 +255,31 @@ def parse(
if schema_type is SchemaType.AVRO:
try:
if dependencies:
wrapped_schema_str = AvroMerge(schema_str, dependencies).wrap()
names = AvroNames(validate_names=True)
deps = list(dependencies.values())

merged_schema = None
for dep in deps:
# Merge dep with all previously merged ones
json_dep = dep.get_schema().to_dict()
merged_schema = make_avsc_object(json_dep, names)

# TODO: recursively add the dependencies of this dependency, so that indirect dependencies are
# working (schema1 --> schema2 --> schema3)

# Merge main schema with all dependencies
schema_json = json.loads(schema_str)
merged_schema = make_avsc_object(schema_json, names)

merged_schema_str = str(merged_schema)
else:
wrapped_schema_str = schema_str
merged_schema_str = schema_str
parsed_schema = parse_avro_schema_definition(
wrapped_schema_str,
merged_schema_str,
validate_enum_symbols=validate_avro_enum_symbols,
validate_names=validate_avro_names,
)
if dependencies:
if isinstance(parsed_schema, avro.schema.UnionSchema):
parsed_schema_result = parsed_schema.schemas[-1].fields[0].type.schemas[-1]

else:
raise InvalidSchema
else:
parsed_schema_result = parsed_schema
parsed_schema_result = parsed_schema
return ParsedTypedSchema(
schema_type=schema_type,
schema_str=schema_str,
Expand Down
81 changes: 72 additions & 9 deletions tests/integration/test_schema_avro_references.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,43 @@
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "address", "type": "Address"},
# {"name": "address", "type": "Address"},
{"name": "job", "type": "Job"},
],
}

SCHEMA_PERSON_RECURSIVE = {
"type": "record",
"name": "PersonRecursive",
"namespace": "com.netapp",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "job", "type": "Job"},
{"name": "father", "type": "PersonRecursive"},
],
}

SCHEMA_JOB_INDIRECT_RECURSIVE = {
"type": "record",
"name": "JobIndirectRecursive",
"namespace": "com.netapp",
"fields": [
{"name": "title", "type": "string"},
{"name": "salary", "type": "double"},
{"name": "consultant", "type": "Person"},
],
}


SCHEMA_PERSON_AGE_INT_LONG = {
"type": "record",
"name": "Person",
"namespace": "com.netapp",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "long"},
{"name": "address", "type": "Address"},
# {"name": "address", "type": "Address"},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented out indirect refs since they don't work, so that other tests can be executed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need to support indirect refs, it's unclear if Schema Registry supports them

{"name": "job", "type": "Job"},
],
}
Expand All @@ -73,7 +97,7 @@
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "string"},
{"name": "address", "type": "Address"},
# {"name": "address", "type": "Address"},
{"name": "job", "type": "Job"},
],
}
Expand All @@ -85,7 +109,7 @@
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "address", "type": "Address"},
# {"name": "address", "type": "Address"},
{"name": "job", "type": "Job"},
{
"name": "children",
Expand All @@ -109,7 +133,7 @@
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "address", "type": "Address"},
# {"name": "address", "type": "Address"},
{"name": "job", "type": "Job"},
],
},
Expand All @@ -120,7 +144,7 @@
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "address", "type": "Address"},
# {"name": "address", "type": "Address"},
],
},
]
Expand All @@ -144,24 +168,30 @@ def address_references(subject_prefix: str) -> list:

def person_references(subject_prefix: str) -> list:
return [
{"name": "address.avsc", "subject": f"{subject_prefix}address", "version": 1},
# {"name": "address.avsc", "subject": f"{subject_prefix}address", "version": 1},
{"name": "job.avsc", "subject": f"{subject_prefix}job", "version": 1},
]


def job_indirect_recursive_references(subject_prefix: str) -> list:
return [
{"name": "person.avsc", "subject": f"{subject_prefix}person", "version": 1},
]


def stored_person_subject(subject_prefix: str, subject_id: int) -> dict:
return {
"id": subject_id,
"references": [
{"name": "address.avsc", "subject": f"{subject_prefix}address", "version": 1},
# {"name": "address.avsc", "subject": f"{subject_prefix}address", "version": 1},
{"name": "job.avsc", "subject": f"{subject_prefix}job", "version": 1},
],
"schema": json.dumps(
{
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "address", "type": "Address"},
# {"name": "address", "type": "Address"},
{"name": "job", "type": "Job"},
],
"name": "Person",
Expand Down Expand Up @@ -206,12 +236,14 @@ async def basic_avro_references_fill_test(registry_async_client: Client, subject
res = await registry_async_client.post(f"subjects/{subject_prefix}job/versions", json={"schema": json.dumps(SCHEMA_JOB)})
assert res.status_code == 200
assert "id" in res.json()

res = await registry_async_client.post(
f"subjects/{subject_prefix}person/versions",
json={"schemaType": "AVRO", "schema": json.dumps(SCHEMA_PERSON), "references": person_references(subject_prefix)},
)
assert res.status_code == 200
assert "id" in res.json()

return res


Expand Down Expand Up @@ -293,3 +325,34 @@ async def test_avro_incompatible_name_references(registry_async_client: Client)
assert res.status_code == 409
msg = "Incompatible schema, compatibility_mode=BACKWARD. Incompatibilities: expected: com.netapp.Address"
assert res.json()["message"] == msg


async def test_recursive_reference(registry_async_client: Client) -> None:
subject_prefix = create_subject_name_factory("avro-recursive-reference")()
await basic_avro_references_fill_test(registry_async_client, subject_prefix)
res = await registry_async_client.post(
f"subjects/{subject_prefix}person-recursive/versions",
json={
"schemaType": "AVRO",
"schema": json.dumps(SCHEMA_PERSON_RECURSIVE),
"references": person_references(subject_prefix),
},
)
assert res.status_code == 200
assert "id" in res.json()


# This test fails because indirect references are not implemented
async def test_indirect_recursive_reference(registry_async_client: Client) -> None:
subject_prefix = create_subject_name_factory("avro-indirect-recursive-reference")()
await basic_avro_references_fill_test(registry_async_client, subject_prefix)
res = await registry_async_client.post(
f"subjects/{subject_prefix}person-indirect-recursive/versions",
json={
"schemaType": "AVRO",
"schema": json.dumps(SCHEMA_JOB_INDIRECT_RECURSIVE),
"references": job_indirect_recursive_references(subject_prefix),
},
)
assert res.status_code == 200
assert "id" in res.json()
Loading