From 4380901f33ad689908337b0efe11d5d732179de5 Mon Sep 17 00:00:00 2001 From: clumsy9 Date: Mon, 11 Mar 2024 08:35:19 +0100 Subject: [PATCH] Fix `FatalOutputError` caused by serializing `KafkaError` (#536) * Embed string representation of KafkaError into CriticalInputError --- logprep/connector/confluent_kafka/input.py | 2 +- .../connector/test_confluent_kafka_input.py | 17 +++++++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 9565c6a8f..df31044b8 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -383,7 +383,7 @@ def _get_raw_event(self, timeout: float) -> bytearray: kafka_error = message.error() if kafka_error: raise CriticalInputError( - self, "A confluent-kafka record contains an error code", kafka_error + self, "A confluent-kafka record contains an error code", str(kafka_error) ) self._last_valid_records[message.partition()] = message labels = {"description": f"topic: {self._config.topic} - partition: {message.partition()}"} diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index f777c7bbf..b7ba6dda0 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -8,7 +8,7 @@ from unittest import mock import pytest -from confluent_kafka import OFFSET_BEGINNING, KafkaException +from confluent_kafka import OFFSET_BEGINNING, KafkaError, KafkaException from logprep.abc.input import ( CriticalInputError, @@ -67,7 +67,16 @@ def test_get_next_returns_none_if_no_records(self, _): @mock.patch("logprep.connector.confluent_kafka.input.Consumer") def test_get_next_raises_critical_input_exception_for_invalid_confluent_kafka_record(self, _): mock_record = mock.MagicMock() - mock_record.error = mock.MagicMock(return_value="An arbitrary confluent-kafka error") + mock_record.error = mock.MagicMock( + return_value=KafkaError( + error=3, + reason="Subscribed topic not available: (Test Instance Name) : Broker: Unknown topic or partition", + fatal=False, + retriable=False, + txn_requires_abort=False, + ) + ) + mock_record.value = mock.MagicMock(return_value=None) self.object._consumer.poll = mock.MagicMock(return_value=mock_record) with pytest.raises( @@ -76,7 +85,7 @@ def test_get_next_raises_critical_input_exception_for_invalid_confluent_kafka_re r"CriticalInputError in ConfluentKafkaInput \(Test Instance Name\) - " r"Kafka Input: testserver:9092: " r"A confluent-kafka record contains an error code -> " - r"An arbitrary confluent-kafka error" + r"KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Subscribed topic not available: \(Test Instance Name\) : Broker: Unknown topic or partition\"}" ), ): _, _ = self.object.get_next(1) @@ -156,7 +165,7 @@ def test_get_next_raises_critical_input_error_if_not_a_dict(self, _): self.object.get_next(1) @mock.patch("logprep.connector.confluent_kafka.input.Consumer") - def test_get_next_raises_critical_input_error_if_unvalid_json(self, _): + def test_get_next_raises_critical_input_error_if_invalid_json(self, _): mock_record = mock.MagicMock() mock_record.error = mock.MagicMock() mock_record.error.return_value = None