Skip to content

Commit

Permalink
Attach the message associated with an error to KafkaException.
Browse files Browse the repository at this point in the history
Append the name of the target topic to error messages when it may be relevant.
This latter feature uses a hard-coded list of error codes, which is not ideal, but seems tolerable, as this only affects the exception error message, is never technically wrong (since the error arose from a message being sent to the referenced topic), the full information is always available though the `error` and `message` subobjects, and the set of relevant errors changes infrequently.
  • Loading branch information
cnweaver committed May 27, 2024
1 parent 518838e commit 055c5db
Showing 1 changed file with 55 additions and 7 deletions.
62 changes: 55 additions & 7 deletions adc/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,69 @@ def log_delivery_errors(
def raise_delivery_errors(kafka_error: confluent_kafka.KafkaError,
msg: confluent_kafka.Message) -> None:
if kafka_error is not None:
raise KafkaException.from_kafka_error(kafka_error)
raise KafkaException.from_kafka_error(kafka_error, msg)
elif msg.error() is not None:
raise KafkaException.from_kafka_error(msg.error())
raise KafkaException.from_kafka_error(msg.error(), msg)


def _get_topic_related_errors():
"""Build a set of all Kafka error codes which seem to relate to a specific topic.
This uses a list extracted from all documented error codes up to confluent_kafka v2.4,
but some of these errors did not exist or were not exposed in earlier versions.
To maintain backward compatibility, this function checks whether each error exists before
attempting to otherwise refer to it.
"""
err_names = [
"_UNKNOWN_TOPIC",
"_NO_OFFSET",
"_LOG_TRUNCATION",
"OFFSET_OUT_OF_RANGE",
"UNKNOWN_TOPIC_OR_PART",
"NOT_LEADER_FOR_PARTITION",
"TOPIC_EXCEPTION",
"NOT_ENOUGH_REPLICAS",
"NOT_ENOUGH_REPLICAS_AFTER_APPEND",
"INVALID_COMMIT_OFFSET_SIZE",
"TOPIC_AUTHORIZATION_FAILED",
"TOPIC_ALREADY_EXISTS",
"INVALID_PARTITIONS",
"INVALID_REPLICATION_FACTOR",
"INVALID_REPLICA_ASSIGNMENT",
"REASSIGNMENT_IN_PROGRESS",
"TOPIC_DELETION_DISABLED",
"OFFSET_NOT_AVAILABLE",
"PREFERRED_LEADER_NOT_AVAILABLE",
"NO_REASSIGNMENT_IN_PROGRESS",
"GROUP_SUBSCRIBED_TO_TOPIC",
"UNSTABLE_OFFSET_COMMIT",
"UNKNOWN_TOPIC_ID",
]
errors = set()
for name in err_names:
if hasattr(confluent_kafka.KafkaError, name):
errors.add(getattr(confluent_kafka.KafkaError, name))
else:
logger.debug(f"{name} does not exist in confluent_kafka version "
f"{confluent_kafka.__version__} ({confluent_kafka.libversion()})")
return errors


class KafkaException(Exception):
@classmethod
def from_kafka_error(cls, error):
return cls(error)
def from_kafka_error(cls, error, msg=None):
return cls(error, msg)

topic_related_errors = _get_topic_related_errors()

def __init__(self, error):
def __init__(self, error, msg=None):
self.error = error
self.name = error.name()
self.reason = error.str()
self.retriable = error.retriable()
self.fatal = error.fatal()
msg = f"Error communicating with Kafka: code={self.name} {self.reason}"
super(KafkaException, self).__init__(msg)
self.message = msg
ex_msg = f"Error communicating with Kafka: code={self.name} {self.reason}"
if msg and error.code() in KafkaException.topic_related_errors:
ex_msg += f" on topic {msg.topic()}"
super(KafkaException, self).__init__(ex_msg)

0 comments on commit 055c5db

Please sign in to comment.