Skip to content

Commit

Permalink
Merge pull request #963 from Aiven-Open/nosahama/log-msg-key-on-schem…
Browse files Browse the repository at this point in the history
…a-reader-error

schema-reader: Log the erroring kafka message key
  • Loading branch information
eliax1996 authored Sep 25, 2024
2 parents 0574efb + 25889f0 commit 349ae92
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
8 changes: 7 additions & 1 deletion karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,14 @@ def consume_messages(self, msgs: list[Message], watch_offsets: bool) -> None:

assert message_key is not None
key = json_decode(message_key)
except AssertionError as exc:
LOG.warning("Empty msg.key() at offset %s", msg.offset())
self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress.
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue # [non-strict mode]
except JSONDecodeError as exc:
LOG.warning("Invalid JSON in msg.key() at offset %s", msg.offset())
non_bytes_key = msg.key().decode() # type: ignore[union-attr]
LOG.warning("Invalid JSON in msg.key(): %s at offset %s", non_bytes_key, msg.offset())
self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress.
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue # [non-strict mode]
Expand Down
13 changes: 11 additions & 2 deletions tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche
ok1_message.value.return_value = schema_str
ok1_message.offset.return_value = 1
invalid_key_message = Mock(spec=Message)
invalid_key_message.key.return_value = "invalid-key"
invalid_key_message.key.return_value = b"invalid-key"
invalid_key_message.error.return_value = None
invalid_key_message.value.return_value = schema_str
invalid_key_message.offset.return_value = 2
Expand Down Expand Up @@ -388,7 +388,16 @@ def factory(key: bytes, value: bytes, offset: int = 1) -> Message:
schema_type=None,
message_type=MessageType.schema,
expected_error=CorruptKafkaRecordException,
expected_log_message="Invalid JSON in msg.key() at offset 1",
expected_log_message='Invalid JSON in msg.key(): {subject1::::"test""version":1"magic":1} at offset 1',
),
KafkaMessageHandlingErrorTestCase(
test_name="Message key is empty, i.e. `null/None`",
key=None,
value=b'{"value": "value does not matter at this stage, just correct JSON"}',
schema_type=None,
message_type=MessageType.schema,
expected_error=CorruptKafkaRecordException,
expected_log_message="Empty msg.key() at offset 1",
),
KafkaMessageHandlingErrorTestCase(
test_name="Keytype is missing from message key",
Expand Down

0 comments on commit 349ae92

Please sign in to comment.