From 055c5db7c421de67473fcba2f5a38b5c29aefd8a Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Mon, 27 May 2024 11:33:33 -0400 Subject: [PATCH] Attach the message associated with an error to KafkaException. Append the name of the target topic to error messages when it may be relevant. This latter feature uses a hard-coded list of error codes, which is not ideal, but seems tolerable, as this only affects the exception error message, is never technically wrong (since the error arose from a message being sent to the referenced topic), the full information is always available though the `error` and `message` subobjects, and the set of relevant errors changes infrequently. --- adc/errors.py | 62 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/adc/errors.py b/adc/errors.py index adb34c9..7dc2a4b 100644 --- a/adc/errors.py +++ b/adc/errors.py @@ -34,21 +34,69 @@ def log_delivery_errors( def raise_delivery_errors(kafka_error: confluent_kafka.KafkaError, msg: confluent_kafka.Message) -> None: if kafka_error is not None: - raise KafkaException.from_kafka_error(kafka_error) + raise KafkaException.from_kafka_error(kafka_error, msg) elif msg.error() is not None: - raise KafkaException.from_kafka_error(msg.error()) + raise KafkaException.from_kafka_error(msg.error(), msg) + + +def _get_topic_related_errors(): + """Build a set of all Kafka error codes which seem to relate to a specific topic. + + This uses a list extracted from all documented error codes up to confluent_kafka v2.4, + but some of these errors did not exist or were not exposed in earlier versions. + To maintain backward compatibility, this function checks whether each error exists before + attempting to otherwise refer to it. + """ + err_names = [ + "_UNKNOWN_TOPIC", + "_NO_OFFSET", + "_LOG_TRUNCATION", + "OFFSET_OUT_OF_RANGE", + "UNKNOWN_TOPIC_OR_PART", + "NOT_LEADER_FOR_PARTITION", + "TOPIC_EXCEPTION", + "NOT_ENOUGH_REPLICAS", + "NOT_ENOUGH_REPLICAS_AFTER_APPEND", + "INVALID_COMMIT_OFFSET_SIZE", + "TOPIC_AUTHORIZATION_FAILED", + "TOPIC_ALREADY_EXISTS", + "INVALID_PARTITIONS", + "INVALID_REPLICATION_FACTOR", + "INVALID_REPLICA_ASSIGNMENT", + "REASSIGNMENT_IN_PROGRESS", + "TOPIC_DELETION_DISABLED", + "OFFSET_NOT_AVAILABLE", + "PREFERRED_LEADER_NOT_AVAILABLE", + "NO_REASSIGNMENT_IN_PROGRESS", + "GROUP_SUBSCRIBED_TO_TOPIC", + "UNSTABLE_OFFSET_COMMIT", + "UNKNOWN_TOPIC_ID", + ] + errors = set() + for name in err_names: + if hasattr(confluent_kafka.KafkaError, name): + errors.add(getattr(confluent_kafka.KafkaError, name)) + else: + logger.debug(f"{name} does not exist in confluent_kafka version " + f"{confluent_kafka.__version__} ({confluent_kafka.libversion()})") + return errors class KafkaException(Exception): @classmethod - def from_kafka_error(cls, error): - return cls(error) + def from_kafka_error(cls, error, msg=None): + return cls(error, msg) + + topic_related_errors = _get_topic_related_errors() - def __init__(self, error): + def __init__(self, error, msg=None): self.error = error self.name = error.name() self.reason = error.str() self.retriable = error.retriable() self.fatal = error.fatal() - msg = f"Error communicating with Kafka: code={self.name} {self.reason}" - super(KafkaException, self).__init__(msg) + self.message = msg + ex_msg = f"Error communicating with Kafka: code={self.name} {self.reason}" + if msg and error.code() in KafkaException.topic_related_errors: + ex_msg += f" on topic {msg.topic()}" + super(KafkaException, self).__init__(ex_msg)