Skip to content

Commit

Permalink
Handle UnicodeDecodeError in kafka input
Browse files Browse the repository at this point in the history
  • Loading branch information
ppcad committed Sep 10, 2024
1 parent 00fd35d commit 03f02d6
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
6 changes: 5 additions & 1 deletion logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,11 @@ def _get_event(self, timeout: float) -> Union[Tuple[None, None], Tuple[dict, dic
if raw_event is None:
return None, None
try:
event_dict = self._decoder.decode(raw_event)
event_dict = self._decoder.decode(raw_event.decode("utf-8"))
except UnicodeDecodeError as error:
raise CriticalInputParsingError(
self, "Input record value is not 'utf-8' encoded", str(raw_event)
) from error
except msgspec.DecodeError as error:
raise CriticalInputParsingError(
self, "Input record value is not a valid json string", raw_event
Expand Down
28 changes: 28 additions & 0 deletions tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,34 @@ def test_get_raw_event_is_callable(self, _): # pylint: disable=arguments-differ
result = self.object._get_raw_event(0.001)
assert result

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_get_event_raises_exception_if_input_invalid_json(self, _):
mock_record = mock.MagicMock()
mock_record.error = mock.MagicMock()
mock_record.error.return_value = None
self.object._consumer.poll = mock.MagicMock(return_value=mock_record)
mock_record.value = mock.MagicMock()
mock_record.value.return_value = '{"invalid_json"}'.encode("utf8")
with pytest.raises(
CriticalInputParsingError,
match="Input record value is not a valid json string -> b'{\"invalid_json\"}'",
):
self.object._get_event(0.001)

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_get_event_raises_exception_if_input_not_utf(self, _):
mock_record = mock.MagicMock()
mock_record.error = mock.MagicMock()
mock_record.error.return_value = None
self.object._consumer.poll = mock.MagicMock(return_value=mock_record)
mock_record.value = mock.MagicMock()
mock_record.value.return_value = '{"not_utf-8": \xfc}'.encode("cp1252")
with pytest.raises(
CriticalInputParsingError,
match=r"Input record value is not \'utf-8\' encoded -> b\'\{\"not_utf-8\": \\xfc\}\'",
):
self.object._get_event(0.001)

def test_setup_raises_fatal_input_error_on_invalid_config(self):
config = {
"bootstrap.servers": "testinstance:9092",
Expand Down

0 comments on commit 03f02d6

Please sign in to comment.