diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 8f548d134..813bbd00d 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -416,7 +416,11 @@ def _get_event(self, timeout: float) -> Union[Tuple[None, None], Tuple[dict, dic if raw_event is None: return None, None try: - event_dict = self._decoder.decode(raw_event) + event_dict = self._decoder.decode(raw_event.decode("utf-8")) + except UnicodeDecodeError as error: + raise CriticalInputParsingError( + self, "Input record value is not 'utf-8' encoded", str(raw_event) + ) from error except msgspec.DecodeError as error: raise CriticalInputParsingError( self, "Input record value is not a valid json string", raw_event diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index 77fd85f0f..9e5effb4b 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -200,6 +200,34 @@ def test_get_raw_event_is_callable(self, _): # pylint: disable=arguments-differ result = self.object._get_raw_event(0.001) assert result + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_get_event_raises_exception_if_input_invalid_json(self, _): + mock_record = mock.MagicMock() + mock_record.error = mock.MagicMock() + mock_record.error.return_value = None + self.object._consumer.poll = mock.MagicMock(return_value=mock_record) + mock_record.value = mock.MagicMock() + mock_record.value.return_value = '{"invalid_json"}'.encode("utf8") + with pytest.raises( + CriticalInputParsingError, + match="Input record value is not a valid json string -> b'{\"invalid_json\"}'", + ): + self.object._get_event(0.001) + + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_get_event_raises_exception_if_input_not_utf(self, _): + mock_record = mock.MagicMock() + mock_record.error = mock.MagicMock() + mock_record.error.return_value = None + self.object._consumer.poll = mock.MagicMock(return_value=mock_record) + mock_record.value = mock.MagicMock() + mock_record.value.return_value = '{"not_utf-8": \xfc}'.encode("cp1252") + with pytest.raises( + CriticalInputParsingError, + match=r"Input record value is not \'utf-8\' encoded -> b\'\{\"not_utf-8\": \\xfc\}\'", + ): + self.object._get_event(0.001) + def test_setup_raises_fatal_input_error_on_invalid_config(self): config = { "bootstrap.servers": "testinstance:9092",