Skip to content

Commit

Permalink
tests, avro: test refactoring WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
davide-armand committed Sep 27, 2024
1 parent b7b29d0 commit 54c3f42
Showing 1 changed file with 85 additions and 0 deletions.
85 changes: 85 additions & 0 deletions tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,3 +717,88 @@ def test_message_error_handling_with_invalid_reference_schema(
for ow, ew in zip(occurred_warnings, expected_warnings):
assert ow.name == "karapace.schema_reader"
assert ow.message == ew


def _test_message_handling(
caplog: LogCaptureFixture,
test_case: SingleMessageHandlingErrorTestCase,
schema_reader_with_consumer_messages_factory: Callable[[Tuple[List[Message]]], KafkaSchemaReader],
message_factory: Callable[[bytes, bytes, int], Message],
) -> None:
message = message_factory(key=test_case.key, value=test_case.value)

with caplog.at_level(logging.WARN, logger="karapace.schema_reader"):
caplog.clear()
schema_reader = schema_reader_with_consumer_messages_factory(([],))

schema_reader.consumer.consume.side_effect = ([message],)

LOG.info("Handling message: %s", test_case.value)
LOG.info("Expected exception: %s", test_case.expected_exception)
if test_case.expected_exception:
with pytest.raises(test_case.expected_exception):
schema_reader.handle_messages()
else:
schema_reader.handle_messages()

assert schema_reader.offset == 1
assert not schema_reader.ready

for log in caplog.records:
assert log.name == "karapace.schema_reader"
assert log.levelname == "WARNING"
LOG.info("Got warning: %s", log.message)
LOG.info("Expected warning: %s", test_case.expected_warn_message)
assert log.message == test_case.expected_warn_message


def test_invalid_protobuf_schema_because_referencing_a_corrupted_schema(
caplog: LogCaptureFixture,
schema_reader_with_consumer_messages_factory: Callable[[Tuple[List[Message]]], KafkaSchemaReader],
message_factory: Callable[[bytes, bytes, int], Message],
) -> None:
# Given an invalid PROTOBUF schema (corrupted)
# When handling the corrupted schema
# Then the schema is recognised as invalid because corrupted
_test_message_handling(
caplog,
SingleMessageHandlingErrorTestCase(
test_name="PROTOBUF corrupted",
key=b'{"keytype":"SCHEMA","subject":"testref","version":1,"magic":1}',
value=(
b'{"schemaType": "PROTOBUF", "subject": "testref", "version": 1, "id": 1, "deleted": false'
+ b', "schema": '
+ json.dumps(schema_protobuf_invalid_because_corrupted).encode()
+ b"}"
),
schema_type=SchemaType.PROTOBUF,
message_type=MessageType.schema,
expected_exception=CorruptKafkaRecordException,
expected_warn_message="Schema is not valid ProtoBuf definition",
),
schema_reader_with_consumer_messages_factory,
message_factory,
)
# And given a PROTOBUF schema referencing that corrupted schema (valid otherwise)
# When handling the schema
# Then the schema is recognised as invalid because of the corrupted referenced schema
_test_message_handling(
caplog,
SingleMessageHandlingErrorTestCase(
test_name="PROTOBUF referencing corrupted",
key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}',
value=(
b'{"schemaType": "PROTOBUF", "subject": "test", "version": 1, "id": 1, "deleted": false'
+ b', "schema": '
+ json.dumps(schema_protobuf_with_invalid_ref).encode()
+ b', "references": [{"name": "testref.proto", "subject": "testref", "version": 1}]'
+ b"}"
),
schema_type=SchemaType.PROTOBUF,
message_type=MessageType.schema,
expected_exception=CorruptKafkaRecordException,
expected_warn_message="Invalid Protobuf references",
),
schema_reader_with_consumer_messages_factory,
message_factory,
)

0 comments on commit 54c3f42

Please sign in to comment.