Skip to content

Commit

Permalink
Fix FatalOutputError caused by serializing KafkaError (#536)
Browse files Browse the repository at this point in the history
* Embed string representation of KafkaError into CriticalInputError
  • Loading branch information
clumsy9 authored Mar 11, 2024
1 parent 2009df1 commit 4380901
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
2 changes: 1 addition & 1 deletion logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def _get_raw_event(self, timeout: float) -> bytearray:
kafka_error = message.error()
if kafka_error:
raise CriticalInputError(
self, "A confluent-kafka record contains an error code", kafka_error
self, "A confluent-kafka record contains an error code", str(kafka_error)
)
self._last_valid_records[message.partition()] = message
labels = {"description": f"topic: {self._config.topic} - partition: {message.partition()}"}
Expand Down
17 changes: 13 additions & 4 deletions tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from unittest import mock

import pytest
from confluent_kafka import OFFSET_BEGINNING, KafkaException
from confluent_kafka import OFFSET_BEGINNING, KafkaError, KafkaException

from logprep.abc.input import (
CriticalInputError,
Expand Down Expand Up @@ -67,7 +67,16 @@ def test_get_next_returns_none_if_no_records(self, _):
@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_get_next_raises_critical_input_exception_for_invalid_confluent_kafka_record(self, _):
mock_record = mock.MagicMock()
mock_record.error = mock.MagicMock(return_value="An arbitrary confluent-kafka error")
mock_record.error = mock.MagicMock(
return_value=KafkaError(
error=3,
reason="Subscribed topic not available: (Test Instance Name) : Broker: Unknown topic or partition",
fatal=False,
retriable=False,
txn_requires_abort=False,
)
)

mock_record.value = mock.MagicMock(return_value=None)
self.object._consumer.poll = mock.MagicMock(return_value=mock_record)
with pytest.raises(
Expand All @@ -76,7 +85,7 @@ def test_get_next_raises_critical_input_exception_for_invalid_confluent_kafka_re
r"CriticalInputError in ConfluentKafkaInput \(Test Instance Name\) - "
r"Kafka Input: testserver:9092: "
r"A confluent-kafka record contains an error code -> "
r"An arbitrary confluent-kafka error"
r"KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str=\"Subscribed topic not available: \(Test Instance Name\) : Broker: Unknown topic or partition\"}"
),
):
_, _ = self.object.get_next(1)
Expand Down Expand Up @@ -156,7 +165,7 @@ def test_get_next_raises_critical_input_error_if_not_a_dict(self, _):
self.object.get_next(1)

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_get_next_raises_critical_input_error_if_unvalid_json(self, _):
def test_get_next_raises_critical_input_error_if_invalid_json(self, _):
mock_record = mock.MagicMock()
mock_record.error = mock.MagicMock()
mock_record.error.return_value = None
Expand Down

0 comments on commit 4380901

Please sign in to comment.