From 65c292093dfbbd0f7532cccebf3250f93ddadb25 Mon Sep 17 00:00:00 2001 From: Davide Armand Date: Wed, 18 Sep 2024 11:12:16 +0200 Subject: [PATCH] fix: align transitive compatibility checks Make so that the transitive compatibility checks that are done in the schema creation endpoint are also done in the schema validation endpoint. In the creation endpoint (`/subjects//versions`), if the compatibility mode is transient then the new schema is checked against all schemas. In the validation endpoint (`/compatibility/subjects//versions/`): - Before this fix, only the latest schema is checked against (even in case of transitive mode) - After this fix, in case of transitive mode then all schema are checked against. Note that in this case the version provided in the POST request (``) is ignored. --- README.rst | 4 + src/karapace/schema_registry.py | 4 +- src/karapace/schema_registry_apis.py | 13 +- .../integration/test_schema_compatibility.py | 218 ++++++++++++++++++ .../unit/compatibility/test_compatibility.py | 24 ++ tests/unit/test_schema_registry_api.py | 4 +- website/source/quickstart.rst | 4 + 7 files changed, 263 insertions(+), 8 deletions(-) create mode 100644 tests/integration/test_schema_compatibility.py diff --git a/README.rst b/README.rst index d1bcbd28f..f513c26f2 100644 --- a/README.rst +++ b/README.rst @@ -149,6 +149,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke http://localhost:8081/compatibility/subjects/test-key/versions/latest {"is_compatible":true} +NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the +compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done +when trying to register the new schema through the `subjects//versions` endpoint. + Get current global backwards compatibility setting value:: $ curl -X GET http://localhost:8081/config diff --git a/src/karapace/schema_registry.py b/src/karapace/schema_registry.py index 06a0d91ae..d61cc058a 100644 --- a/src/karapace/schema_registry.py +++ b/src/karapace/schema_registry.py @@ -463,7 +463,7 @@ def check_schema_compatibility( if not live_versions: old_versions = [] elif compatibility_mode.is_transitive(): - # Only check against all versions + # Check against all versions old_versions = live_versions else: # Only check against latest version @@ -479,7 +479,7 @@ def check_schema_compatibility( ) if is_incompatible(result): - break + return result return result diff --git a/src/karapace/schema_registry_apis.py b/src/karapace/schema_registry_apis.py index 6b7453d03..8800d6734 100644 --- a/src/karapace/schema_registry_apis.py +++ b/src/karapace/schema_registry_apis.py @@ -397,9 +397,14 @@ async def compatibility_check( ) new_schema = self.get_new_schema(request.json, content_type) - old_schema = self._get_old_schema(subject, Versioner.V(version), content_type) - - result = SchemaCompatibility.check_compatibility(old_schema, new_schema, compatibility_mode) + old_schema = self.get_old_schema(subject, Versioner.V(version), content_type) + if compatibility_mode.is_transitive(): + # Ignore the schema version provided in the rest api call (`version`) + # Instead check against all previous versions (including `version` if existing) + result = self.schema_registry.check_schema_compatibility(new_schema, subject) + else: + # Check against the schema version provided in the rest api call (`version`) + result = SchemaCompatibility.check_compatibility(old_schema, new_schema, compatibility_mode) if is_incompatible(result): self.r({"is_compatible": False}, content_type) @@ -1338,7 +1343,7 @@ def get_new_schema(self, body: JsonObject, content_type: str) -> ValidatedTypedS status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - def _get_old_schema(self, subject: Subject, version: Version, content_type: str) -> ParsedTypedSchema: + def get_old_schema(self, subject: Subject, version: Version, content_type: str) -> ParsedTypedSchema: try: old = self.schema_registry.subject_version_get(subject=subject, version=version) except InvalidVersion: diff --git a/tests/integration/test_schema_compatibility.py b/tests/integration/test_schema_compatibility.py new file mode 100644 index 000000000..7d804e989 --- /dev/null +++ b/tests/integration/test_schema_compatibility.py @@ -0,0 +1,218 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from dataclasses import dataclass +from karapace.client import Client +from karapace.typing import Subject +from tests.base_testcase import BaseTestCase +from typing import Any, Callable, Coroutine, Union + +import json +import logging +import pytest + +LOG = logging.getLogger(__name__) + +schema_int = {"type": "record", "name": "schema_name", "fields": [{"type": "int", "name": "field_name"}]} +schema_long = {"type": "record", "name": "schema_name", "fields": [{"type": "long", "name": "field_name"}]} +schema_string = {"type": "record", "name": "schema_name", "fields": [{"type": "string", "name": "field_name"}]} +schema_double = {"type": "record", "name": "schema_name", "fields": [{"type": "double", "name": "field_name"}]} + + +@dataclass +class SchemaCompatibilityTestCase(BaseTestCase): + new_schema: str + compatibility_mode: str + register_baseline_schemas: Callable[[Client, Subject], Coroutine[Any, Any, None]] + expected_is_compatible: Union[bool, None] + expected_status_code: int + expected_incompatibilities: Union[str, None] + + +async def _register_baseline_schemas_no_incompatibilities(registry_async_client: Client, subject: Subject) -> None: + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_int)}, + ) + assert res.status_code == 200 + + # Changing type from int to long is compatible + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_long)}, + ) + assert res.status_code == 200 + + +async def _register_baseline_schemas_with_incompatibilities(registry_async_client: Client, subject: Subject) -> None: + # Allow registering non backward compatible schemas + await _set_compatibility_mode(registry_async_client, subject, "NONE") + + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_string)}, + ) + assert res.status_code == 200 + + # Changing type from string to double is incompatible + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_double)}, + ) + assert res.status_code == 200 + + +async def _register_baseline_schemas_with_incompatibilities_and_a_deleted_schema( + registry_async_client: Client, subject: Subject +) -> None: + await _register_baseline_schemas_with_incompatibilities(registry_async_client, subject) + + # Register schema + # Changing type from double to long is incompatible + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_long)}, + ) + assert res.status_code == 200 + + # And delete it + res = await registry_async_client.delete(f"subjects/{subject}/versions/latest") + assert res.status_code == 200 + assert res.json() == 3 + + +async def _register_no_baseline_schemas( + registry_async_client: Client, subject: Subject # pylint: disable=unused-argument +) -> None: + pass + + +async def _set_compatibility_mode(registry_async_client: Client, subject: Subject, compatibility_mode: str) -> None: + res = await registry_async_client.put(f"config/{subject}", json={"compatibility": compatibility_mode}) + assert res.status_code == 200 + + +@pytest.mark.parametrize( + "test_case", + [ + # Case 0 + # New schema compatible with all baseline ones (int --> long, long --> long) + # Transitive mode + # --> No incompatibilities are found + SchemaCompatibilityTestCase( + test_name="case0", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_baseline_schemas_no_incompatibilities, + new_schema=json.dumps(schema_long), + expected_is_compatible=True, + expected_status_code=200, + expected_incompatibilities=None, + ), + # Case 1 + # Same as previous case, but in non-transitive mode + # --> No incompatibilities are found + SchemaCompatibilityTestCase( + test_name="case1", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_baseline_schemas_no_incompatibilities, + new_schema=json.dumps(schema_long), + expected_is_compatible=True, + expected_status_code=200, + expected_incompatibilities=None, + ), + # Case 2 + # New schema incompatible with both baseline schemas (string --> int, double --> int) + # Non-transitive mode + # --> Incompatibilies are found only against last baseline schema (double --> int) + SchemaCompatibilityTestCase( + test_name="case2", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: double", + ), + # Case 3 + # Same as previous case, but in non-transitive mode + # --> Incompatibilies are found in the first baseline schema (string --> int) + SchemaCompatibilityTestCase( + test_name="case3", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: string", + ), + # Case 4 + # Same as case 2, but with a deleted schema among baseline ones + # Non-transitive mode + # --> The delete schema is ignored + # --> Incompatibilies are found only against last baseline schema (double --> int) + SchemaCompatibilityTestCase( + test_name="case4", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities_and_a_deleted_schema, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: double", + ), + # Case 5 + # Same as case 3, but with a deleted schema among baseline ones + # --> The delete schema is ignored + # --> Incompatibilies are found in the first baseline schema (string --> int) + SchemaCompatibilityTestCase( + test_name="case5", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities_and_a_deleted_schema, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: string", + ), + # Case 6 + # A new schema and no baseline schemas + # Non-transitive mode + # --> No incompatibilities are found + # --> Status code is 404 because `latest` version to check against does not exists + SchemaCompatibilityTestCase( + test_name="case6", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_no_baseline_schemas, + new_schema=json.dumps(schema_int), + expected_is_compatible=None, + expected_status_code=404, + expected_incompatibilities=None, + ), + # Case 7 + # Same as previous case, but in non-transitive mode + # --> No incompatibilities are found + # --> Status code is 404 because `latest` version to check against does not exists + SchemaCompatibilityTestCase( + test_name="case7", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_no_baseline_schemas, + new_schema=json.dumps(schema_int), + expected_is_compatible=None, + expected_status_code=404, + expected_incompatibilities=None, + ), + ], +) +async def test_schema_compatibility(test_case: SchemaCompatibilityTestCase, registry_async_client: Client) -> None: + subject = Subject(f"subject_{test_case.test_name}") + + await test_case.register_baseline_schemas(registry_async_client, subject) + await _set_compatibility_mode(registry_async_client, subject, test_case.compatibility_mode) + + LOG.info("Validating new schema: %s", test_case.new_schema) + res = await registry_async_client.post( + f"compatibility/subjects/{subject}/versions/latest", json={"schema": test_case.new_schema} + ) + + assert res.status_code == test_case.expected_status_code + assert res.json().get("is_compatible") == test_case.expected_is_compatible + assert res.json().get("incompatibilities", None) == test_case.expected_incompatibilities diff --git a/tests/unit/compatibility/test_compatibility.py b/tests/unit/compatibility/test_compatibility.py index 76f0e22b9..af41aae99 100644 --- a/tests/unit/compatibility/test_compatibility.py +++ b/tests/unit/compatibility/test_compatibility.py @@ -20,3 +20,27 @@ def test_schema_type_can_change_when_mode_none() -> None: old_schema=avro_schema, new_schema=json_schema, compatibility_mode=CompatibilityModes.NONE ) assert result.compatibility is SchemaCompatibilityType.compatible + + +def test_schema_compatible_in_transitive_mode() -> None: + old_json = '{"type": "array", "name": "name_old"}' + new_json = '{"type": "array", "name": "name_new"}' + old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, old_json) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, new_json) + + result = SchemaCompatibility.check_compatibility( + old_schema=old_schema, new_schema=new_schema, compatibility_mode=CompatibilityModes.FULL_TRANSITIVE + ) + assert result.compatibility is SchemaCompatibilityType.compatible + + +def test_schema_incompatible_in_transitive_mode() -> None: + old_json = '{"type": "array"}' + new_json = '{"type": "integer"}' + old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, old_json) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, new_json) + + result = SchemaCompatibility.check_compatibility( + old_schema=old_schema, new_schema=new_schema, compatibility_mode=CompatibilityModes.FULL_TRANSITIVE + ) + assert result.compatibility is SchemaCompatibilityType.incompatible diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/test_schema_registry_api.py index 6d850f5fc..7fcecd47e 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/test_schema_registry_api.py @@ -14,7 +14,7 @@ import pytest -async def test_validate_schema_request_body(): +async def test_validate_schema_request_body() -> None: controller = KarapaceSchemaRegistryController(config=set_config_defaults(DEFAULTS)) controller._validate_schema_request_body( # pylint: disable=W0212 @@ -30,7 +30,7 @@ async def test_validate_schema_request_body(): assert str(exc_info.value) == "HTTPResponse 422" -async def test_forward_when_not_ready(): +async def test_forward_when_not_ready() -> None: with patch("karapace.schema_registry_apis.KarapaceSchemaRegistry") as schema_registry_class: schema_reader_mock = Mock(spec=KafkaSchemaReader) ready_property_mock = PropertyMock(return_value=False) diff --git a/website/source/quickstart.rst b/website/source/quickstart.rst index 6e6ecdba6..f640e68d2 100644 --- a/website/source/quickstart.rst +++ b/website/source/quickstart.rst @@ -60,6 +60,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke $KARAPACE_REGISTRY_URI/compatibility/subjects/test-key/versions/latest {"is_compatible":true} +NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the +compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done +when trying to register the new schema through the `subjects//versions` endpoint. + Get current global backwards compatibility setting value:: $ curl -X GET $KARAPACE_REGISTRY_URI/config