Skip to content

Commit

Permalink
fix: align transitive compatibility checks
Browse files Browse the repository at this point in the history
Make so that the transitive compatibility checks that are done in the
schema creation endpoint are also done in the schema validation
endpoint.

In the creation endpoint (`/subjects/<subject-key>/versions`), if the
compatibility mode is transient then the new schema is checked against
all schemas.

In the validation endpoint (`/compatibility/subjects/<subject-key>/versions/<schema-version>`):
 - Before this fix, only the latest schema is checked against (even in
   case of transitive mode)
 - After this fix, in case of transitive mode then all schema are
   checked against.
   Note that in this case the version provided in the POST request
   (`<schema-version>`) is ignored.
  • Loading branch information
davide-armand committed Oct 3, 2024
1 parent 42b6f5f commit 65c2920
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 8 deletions.
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke
http://localhost:8081/compatibility/subjects/test-key/versions/latest
{"is_compatible":true}

NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the
compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done
when trying to register the new schema through the `subjects/<subject-key>/versions` endpoint.

Get current global backwards compatibility setting value::

$ curl -X GET http://localhost:8081/config
Expand Down
4 changes: 2 additions & 2 deletions src/karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def check_schema_compatibility(
if not live_versions:
old_versions = []
elif compatibility_mode.is_transitive():
# Only check against all versions
# Check against all versions
old_versions = live_versions
else:
# Only check against latest version
Expand All @@ -479,7 +479,7 @@ def check_schema_compatibility(
)

if is_incompatible(result):
break
return result

return result

Expand Down
13 changes: 9 additions & 4 deletions src/karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,14 @@ async def compatibility_check(
)

new_schema = self.get_new_schema(request.json, content_type)
old_schema = self._get_old_schema(subject, Versioner.V(version), content_type)

result = SchemaCompatibility.check_compatibility(old_schema, new_schema, compatibility_mode)
old_schema = self.get_old_schema(subject, Versioner.V(version), content_type)
if compatibility_mode.is_transitive():
# Ignore the schema version provided in the rest api call (`version`)
# Instead check against all previous versions (including `version` if existing)
result = self.schema_registry.check_schema_compatibility(new_schema, subject)
else:
# Check against the schema version provided in the rest api call (`version`)
result = SchemaCompatibility.check_compatibility(old_schema, new_schema, compatibility_mode)

if is_incompatible(result):
self.r({"is_compatible": False}, content_type)
Expand Down Expand Up @@ -1338,7 +1343,7 @@ def get_new_schema(self, body: JsonObject, content_type: str) -> ValidatedTypedS
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)

def _get_old_schema(self, subject: Subject, version: Version, content_type: str) -> ParsedTypedSchema:
def get_old_schema(self, subject: Subject, version: Version, content_type: str) -> ParsedTypedSchema:
try:
old = self.schema_registry.subject_version_get(subject=subject, version=version)
except InvalidVersion:
Expand Down
218 changes: 218 additions & 0 deletions tests/integration/test_schema_compatibility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""
from dataclasses import dataclass
from karapace.client import Client
from karapace.typing import Subject
from tests.base_testcase import BaseTestCase
from typing import Any, Callable, Coroutine, Union

import json
import logging
import pytest

LOG = logging.getLogger(__name__)

schema_int = {"type": "record", "name": "schema_name", "fields": [{"type": "int", "name": "field_name"}]}
schema_long = {"type": "record", "name": "schema_name", "fields": [{"type": "long", "name": "field_name"}]}
schema_string = {"type": "record", "name": "schema_name", "fields": [{"type": "string", "name": "field_name"}]}
schema_double = {"type": "record", "name": "schema_name", "fields": [{"type": "double", "name": "field_name"}]}


@dataclass
class SchemaCompatibilityTestCase(BaseTestCase):
new_schema: str
compatibility_mode: str
register_baseline_schemas: Callable[[Client, Subject], Coroutine[Any, Any, None]]
expected_is_compatible: Union[bool, None]
expected_status_code: int
expected_incompatibilities: Union[str, None]


async def _register_baseline_schemas_no_incompatibilities(registry_async_client: Client, subject: Subject) -> None:
res = await registry_async_client.post(
f"subjects/{subject}/versions",
json={"schemaType": "AVRO", "schema": json.dumps(schema_int)},
)
assert res.status_code == 200

# Changing type from int to long is compatible
res = await registry_async_client.post(
f"subjects/{subject}/versions",
json={"schemaType": "AVRO", "schema": json.dumps(schema_long)},
)
assert res.status_code == 200


async def _register_baseline_schemas_with_incompatibilities(registry_async_client: Client, subject: Subject) -> None:
# Allow registering non backward compatible schemas
await _set_compatibility_mode(registry_async_client, subject, "NONE")

res = await registry_async_client.post(
f"subjects/{subject}/versions",
json={"schemaType": "AVRO", "schema": json.dumps(schema_string)},
)
assert res.status_code == 200

# Changing type from string to double is incompatible
res = await registry_async_client.post(
f"subjects/{subject}/versions",
json={"schemaType": "AVRO", "schema": json.dumps(schema_double)},
)
assert res.status_code == 200


async def _register_baseline_schemas_with_incompatibilities_and_a_deleted_schema(
registry_async_client: Client, subject: Subject
) -> None:
await _register_baseline_schemas_with_incompatibilities(registry_async_client, subject)

# Register schema
# Changing type from double to long is incompatible
res = await registry_async_client.post(
f"subjects/{subject}/versions",
json={"schemaType": "AVRO", "schema": json.dumps(schema_long)},
)
assert res.status_code == 200

# And delete it
res = await registry_async_client.delete(f"subjects/{subject}/versions/latest")
assert res.status_code == 200
assert res.json() == 3


async def _register_no_baseline_schemas(
registry_async_client: Client, subject: Subject # pylint: disable=unused-argument
) -> None:
pass


async def _set_compatibility_mode(registry_async_client: Client, subject: Subject, compatibility_mode: str) -> None:
res = await registry_async_client.put(f"config/{subject}", json={"compatibility": compatibility_mode})
assert res.status_code == 200


@pytest.mark.parametrize(
"test_case",
[
# Case 0
# New schema compatible with all baseline ones (int --> long, long --> long)
# Transitive mode
# --> No incompatibilities are found
SchemaCompatibilityTestCase(
test_name="case0",
compatibility_mode="BACKWARD",
register_baseline_schemas=_register_baseline_schemas_no_incompatibilities,
new_schema=json.dumps(schema_long),
expected_is_compatible=True,
expected_status_code=200,
expected_incompatibilities=None,
),
# Case 1
# Same as previous case, but in non-transitive mode
# --> No incompatibilities are found
SchemaCompatibilityTestCase(
test_name="case1",
compatibility_mode="BACKWARD_TRANSITIVE",
register_baseline_schemas=_register_baseline_schemas_no_incompatibilities,
new_schema=json.dumps(schema_long),
expected_is_compatible=True,
expected_status_code=200,
expected_incompatibilities=None,
),
# Case 2
# New schema incompatible with both baseline schemas (string --> int, double --> int)
# Non-transitive mode
# --> Incompatibilies are found only against last baseline schema (double --> int)
SchemaCompatibilityTestCase(
test_name="case2",
compatibility_mode="BACKWARD",
register_baseline_schemas=_register_baseline_schemas_with_incompatibilities,
new_schema=json.dumps(schema_int),
expected_is_compatible=False,
expected_status_code=200,
expected_incompatibilities="reader type: int not compatible with writer type: double",
),
# Case 3
# Same as previous case, but in non-transitive mode
# --> Incompatibilies are found in the first baseline schema (string --> int)
SchemaCompatibilityTestCase(
test_name="case3",
compatibility_mode="BACKWARD_TRANSITIVE",
register_baseline_schemas=_register_baseline_schemas_with_incompatibilities,
new_schema=json.dumps(schema_int),
expected_is_compatible=False,
expected_status_code=200,
expected_incompatibilities="reader type: int not compatible with writer type: string",
),
# Case 4
# Same as case 2, but with a deleted schema among baseline ones
# Non-transitive mode
# --> The delete schema is ignored
# --> Incompatibilies are found only against last baseline schema (double --> int)
SchemaCompatibilityTestCase(
test_name="case4",
compatibility_mode="BACKWARD",
register_baseline_schemas=_register_baseline_schemas_with_incompatibilities_and_a_deleted_schema,
new_schema=json.dumps(schema_int),
expected_is_compatible=False,
expected_status_code=200,
expected_incompatibilities="reader type: int not compatible with writer type: double",
),
# Case 5
# Same as case 3, but with a deleted schema among baseline ones
# --> The delete schema is ignored
# --> Incompatibilies are found in the first baseline schema (string --> int)
SchemaCompatibilityTestCase(
test_name="case5",
compatibility_mode="BACKWARD_TRANSITIVE",
register_baseline_schemas=_register_baseline_schemas_with_incompatibilities_and_a_deleted_schema,
new_schema=json.dumps(schema_int),
expected_is_compatible=False,
expected_status_code=200,
expected_incompatibilities="reader type: int not compatible with writer type: string",
),
# Case 6
# A new schema and no baseline schemas
# Non-transitive mode
# --> No incompatibilities are found
# --> Status code is 404 because `latest` version to check against does not exists
SchemaCompatibilityTestCase(
test_name="case6",
compatibility_mode="BACKWARD",
register_baseline_schemas=_register_no_baseline_schemas,
new_schema=json.dumps(schema_int),
expected_is_compatible=None,
expected_status_code=404,
expected_incompatibilities=None,
),
# Case 7
# Same as previous case, but in non-transitive mode
# --> No incompatibilities are found
# --> Status code is 404 because `latest` version to check against does not exists
SchemaCompatibilityTestCase(
test_name="case7",
compatibility_mode="BACKWARD_TRANSITIVE",
register_baseline_schemas=_register_no_baseline_schemas,
new_schema=json.dumps(schema_int),
expected_is_compatible=None,
expected_status_code=404,
expected_incompatibilities=None,
),
],
)
async def test_schema_compatibility(test_case: SchemaCompatibilityTestCase, registry_async_client: Client) -> None:
subject = Subject(f"subject_{test_case.test_name}")

await test_case.register_baseline_schemas(registry_async_client, subject)
await _set_compatibility_mode(registry_async_client, subject, test_case.compatibility_mode)

LOG.info("Validating new schema: %s", test_case.new_schema)
res = await registry_async_client.post(
f"compatibility/subjects/{subject}/versions/latest", json={"schema": test_case.new_schema}
)

assert res.status_code == test_case.expected_status_code
assert res.json().get("is_compatible") == test_case.expected_is_compatible
assert res.json().get("incompatibilities", None) == test_case.expected_incompatibilities
24 changes: 24 additions & 0 deletions tests/unit/compatibility/test_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,27 @@ def test_schema_type_can_change_when_mode_none() -> None:
old_schema=avro_schema, new_schema=json_schema, compatibility_mode=CompatibilityModes.NONE
)
assert result.compatibility is SchemaCompatibilityType.compatible


def test_schema_compatible_in_transitive_mode() -> None:
old_json = '{"type": "array", "name": "name_old"}'
new_json = '{"type": "array", "name": "name_new"}'
old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, old_json)
new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, new_json)

result = SchemaCompatibility.check_compatibility(
old_schema=old_schema, new_schema=new_schema, compatibility_mode=CompatibilityModes.FULL_TRANSITIVE
)
assert result.compatibility is SchemaCompatibilityType.compatible


def test_schema_incompatible_in_transitive_mode() -> None:
old_json = '{"type": "array"}'
new_json = '{"type": "integer"}'
old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, old_json)
new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, new_json)

result = SchemaCompatibility.check_compatibility(
old_schema=old_schema, new_schema=new_schema, compatibility_mode=CompatibilityModes.FULL_TRANSITIVE
)
assert result.compatibility is SchemaCompatibilityType.incompatible
4 changes: 2 additions & 2 deletions tests/unit/test_schema_registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import pytest


async def test_validate_schema_request_body():
async def test_validate_schema_request_body() -> None:
controller = KarapaceSchemaRegistryController(config=set_config_defaults(DEFAULTS))

controller._validate_schema_request_body( # pylint: disable=W0212
Expand All @@ -30,7 +30,7 @@ async def test_validate_schema_request_body():
assert str(exc_info.value) == "HTTPResponse 422"


async def test_forward_when_not_ready():
async def test_forward_when_not_ready() -> None:
with patch("karapace.schema_registry_apis.KarapaceSchemaRegistry") as schema_registry_class:
schema_reader_mock = Mock(spec=KafkaSchemaReader)
ready_property_mock = PropertyMock(return_value=False)
Expand Down
4 changes: 4 additions & 0 deletions website/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke
$KARAPACE_REGISTRY_URI/compatibility/subjects/test-key/versions/latest
{"is_compatible":true}

NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the
compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done
when trying to register the new schema through the `subjects/<subject-key>/versions` endpoint.

Get current global backwards compatibility setting value::

$ curl -X GET $KARAPACE_REGISTRY_URI/config
Expand Down

0 comments on commit 65c2920

Please sign in to comment.