From ee021b4c66d7bec97003f93863b1e53030555994 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Fri, 24 May 2024 15:31:22 -0400 Subject: [PATCH 1/5] Support the target topic being specified when write() is called, allowing a single Producer object to be used to write to multiple topics. --- adc/producer.py | 18 ++++++++---- tests/test_kafka_integration.py | 51 +++++++++++++++++++++++++++++++-- 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/adc/producer.py b/adc/producer.py index ff3488a..eaf3122 100644 --- a/adc/producer.py +++ b/adc/producer.py @@ -34,15 +34,23 @@ def __init__(self, conf: 'ProducerConfig') -> None: def write(self, msg: Union[bytes, 'Serializable'], headers: Optional[Union[dict, list]] = None, - delivery_callback: Optional[DeliveryCallback] = log_delivery_errors) -> None: + delivery_callback: Optional[DeliveryCallback] = log_delivery_errors, + topic: Optional[str] = None) -> None: if isinstance(msg, Serializable): msg = msg.serialize() - self.logger.debug("writing message to %s", self.conf.topic) + if topic is None: + if self.conf.topic is not None: + topic = self.conf.topic + else: + raise Exception("No topic specified for write: " + "Either configure a topic when consturcting the Producer, " + "or specify the topic argument to write()") + self.logger.debug("writing message to %s", topic) if delivery_callback is not None: - self._producer.produce(self.conf.topic, msg, headers=headers, + self._producer.produce(topic, msg, headers=headers, on_delivery=delivery_callback) else: - self._producer.produce(self.conf.topic, msg, headers=headers) + self._producer.produce(topic, msg, headers=headers) def flush(self, timeout: timedelta = timedelta(seconds=10)) -> int: """Attempt to flush enqueued messages. Return the number of messages still @@ -78,7 +86,7 @@ def __exit__(self, type, value, traceback) -> bool: @dataclasses.dataclass class ProducerConfig: broker_urls: List[str] - topic: str + topic: Optional[str] auth: Optional[SASLAuth] = None error_callback: Optional[ErrorCallback] = log_client_errors diff --git a/tests/test_kafka_integration.py b/tests/test_kafka_integration.py index b0fd578..b4f751b 100644 --- a/tests/test_kafka_integration.py +++ b/tests/test_kafka_integration.py @@ -377,6 +377,43 @@ def test_contextmanager_support(self): self.assertEqual(messages[1].value(), b"message 2") self.assertEqual(messages[2].value(), b"message 3") + def test_multi_topic_handling(self): + """Use a single producer object to write messages to multiple topics, + and check that a consumer can receive them all. + + """ + topics = ["test_multi_1", "test_multi_2"] + + # Push some messages in + producer = adc.producer.Producer(adc.producer.ProducerConfig( + broker_urls=[self.kafka.address], + topic=None, + auth=self.kafka.auth, + )) + for i in range(0,8): + producer.write(str(i), topic=topics[i%2]) + producer.flush() + logger.info("messages sent") + + # check that we receive the messages from the right topics + consumer = adc.consumer.Consumer(adc.consumer.ConsumerConfig( + broker_urls=[self.kafka.address], + group_id="test_consumer", + auth=self.kafka.auth, + )) + consumer.subscribe(topics) + stream = consumer.stream() + total_messages = 0; + for msg in stream: + if msg.error() is not None: + raise Exception(msg.error()) + idx = int(msg.value()) + self.assertEqual(msg.topic(), topics[idx%2]) + total_messages += 1 + if total_messages == 8: + break + self.assertEqual(total_messages, 8) + class KafkaDockerConnection: """Holds connection information for communicating with a Kafka broker running @@ -437,6 +474,8 @@ def query_kafka_broker_address(self): if not addrs: return None ip = addrs[0]['HostIp'] + if len(ip) == 0: + ip = "localhost" port = addrs[0]['HostPort'] return f"{ip}:{port}" @@ -502,8 +541,16 @@ def get_or_create_container(self): detach=True, auto_remove=True, network=self.net.name, - # Setting None below the OS pick an ephemeral port. - ports={"9092/tcp": None}, + # Kafka insists on redirecting consumers to one of its advertised listeners, + # which it will get wrong if it is running in a private container network. + # To fix this, we need to tell it what to advertise, which means we must + # know what port will be visible from the host system, and we cannot use an + # ephemeral port, which would be known to us only after the container is + # started. Since we have to pick something, pick 9092, which means that + # these tests cannot run if there is already an instance of Kafka running on + # the same host. + ports={"9092/tcp": 9092}, + command=["/root/runServer","--advertisedListener","SASL_SSL://localhost:9092"], ) def get_or_create_docker_network(self): From 897645bcbf3c72af5d448e6c626d27ba9d7ea153 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Fri, 24 May 2024 15:35:45 -0400 Subject: [PATCH 2/5] Run only our own tests --- setup.cfg | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.cfg b/setup.cfg index d05e61c..abea8ab 100644 --- a/setup.cfg +++ b/setup.cfg @@ -13,3 +13,4 @@ exclude = setup.py, [tool:pytest] log_cli = True log_cli_level = INFO +testpaths = tests From 518838e30dae6358951c24c40699a7a33e42ad58 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Fri, 24 May 2024 15:51:09 -0400 Subject: [PATCH 3/5] Resolve (most) linter complaints --- adc/__init__.py | 4 ++-- adc/consumer.py | 17 ++++++++++++----- adc/oidc.py | 2 +- adc/producer.py | 5 +++-- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/adc/__init__.py b/adc/__init__.py index 84fd891..725a2de 100644 --- a/adc/__init__.py +++ b/adc/__init__.py @@ -1,8 +1,8 @@ try: - from importlib.metadata import version, PackageNotFoundError + from importlib.metadata import PackageNotFoundError, version except ImportError: # NOTE: remove after dropping support for Python < 3.8 - from importlib_metadata import version, PackageNotFoundError + from importlib_metadata import PackageNotFoundError, version try: __version__ = version("adc-streaming") diff --git a/adc/consumer.py b/adc/consumer.py index ffa8076..b0ac9fa 100644 --- a/adc/consumer.py +++ b/adc/consumer.py @@ -1,12 +1,13 @@ import dataclasses import enum import logging -from datetime import datetime, timedelta import threading +from collections import defaultdict +from datetime import datetime, timedelta # Imports from typing are deprecated as of Python 3.9 but required for # compatibility with earlier versions -from typing import Dict, Iterable, Iterator, List, Optional, Set, Union, Collection -from collections import defaultdict +from typing import (Collection, Dict, Iterable, Iterator, List, Optional, Set, + Union) import confluent_kafka # type: ignore import confluent_kafka.admin # type: ignore @@ -15,6 +16,7 @@ from .errors import ErrorCallback, log_client_errors from .oidc import set_oauth_cb + class LogicalOffset(enum.IntEnum): BEGINNING = confluent_kafka.OFFSET_BEGINNING EARLIEST = confluent_kafka.OFFSET_BEGINNING @@ -26,6 +28,7 @@ class LogicalOffset(enum.IntEnum): INVALID = confluent_kafka.OFFSET_INVALID + class Consumer: conf: 'ConsumerConfig' _consumer: confluent_kafka.Consumer @@ -99,14 +102,15 @@ def mark_done(self, msg: confluent_kafka.Message, asynchronous: bool = True): self._consumer.commit(msg, asynchronous=False) def _offsets_for_position(self, partitions: Collection[confluent_kafka.TopicPartition], - position: Union[datetime, LogicalOffset]) -> List[confluent_kafka.TopicPartition]: + position: Union[datetime, LogicalOffset]) \ + -> List[confluent_kafka.TopicPartition]: if isinstance(position, datetime): offset = int(position.timestamp() * 1000) elif isinstance(position, LogicalOffset): offset = position else: raise TypeError("Only datetime objects and logical offsets supported") - + _partitions = [ confluent_kafka.TopicPartition(topic=tp.topic, partition=tp.partition, offset=offset) for tp in partitions @@ -248,6 +252,7 @@ def close(self): """ Close the consumer, ending its subscriptions. """ self._consumer.close() + # Used to be called ConsumerStartPosition, though this was confusing because # it only affects "auto.offset.reset" not the start position for a call to # consume. @@ -258,10 +263,12 @@ class ConsumerDefaultPosition(enum.Enum): def __str__(self): return self.name.lower() + # Alias to the old name # TODO: Remove alias on the next breaking release ConsumerStartPosition = ConsumerDefaultPosition + @dataclasses.dataclass class ConsumerConfig: broker_urls: List[str] diff --git a/adc/oidc.py b/adc/oidc.py index 00fb12f..e2fffb9 100644 --- a/adc/oidc.py +++ b/adc/oidc.py @@ -18,7 +18,7 @@ def set_oauth_cb(config): from authlib.integrations.requests_client import OAuth2Session session = OAuth2Session(client_id, client_secret, scope=scope) - + def oauth_cb(*_, **__): token = session.fetch_token( token_endpoint, grant_type='client_credentials') diff --git a/adc/producer.py b/adc/producer.py index eaf3122..81c4d35 100644 --- a/adc/producer.py +++ b/adc/producer.py @@ -1,9 +1,9 @@ import abc -from ast import comprehension import dataclasses import logging from datetime import timedelta from typing import Dict, List, Optional, Union + try: # this will work only in python >= 3.8 from typing import Literal except ImportError: @@ -112,7 +112,8 @@ class ProducerConfig: # between attempts to reconnect to Kafka. reconnect_max_time: timedelta = timedelta(seconds=10) - compression_type: Optional[Union[Literal['gzip'], Literal['snappy'], Literal['lz4'], Literal['zstd']]] = None + compression_type: Optional[Union[Literal['gzip'], Literal['snappy'], + Literal['lz4'], Literal['zstd']]] = None # maximum message size, before compression message_max_bytes: Optional[int] = None From 055c5db7c421de67473fcba2f5a38b5c29aefd8a Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Mon, 27 May 2024 11:33:33 -0400 Subject: [PATCH 4/5] Attach the message associated with an error to KafkaException. Append the name of the target topic to error messages when it may be relevant. This latter feature uses a hard-coded list of error codes, which is not ideal, but seems tolerable, as this only affects the exception error message, is never technically wrong (since the error arose from a message being sent to the referenced topic), the full information is always available though the `error` and `message` subobjects, and the set of relevant errors changes infrequently. --- adc/errors.py | 62 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/adc/errors.py b/adc/errors.py index adb34c9..7dc2a4b 100644 --- a/adc/errors.py +++ b/adc/errors.py @@ -34,21 +34,69 @@ def log_delivery_errors( def raise_delivery_errors(kafka_error: confluent_kafka.KafkaError, msg: confluent_kafka.Message) -> None: if kafka_error is not None: - raise KafkaException.from_kafka_error(kafka_error) + raise KafkaException.from_kafka_error(kafka_error, msg) elif msg.error() is not None: - raise KafkaException.from_kafka_error(msg.error()) + raise KafkaException.from_kafka_error(msg.error(), msg) + + +def _get_topic_related_errors(): + """Build a set of all Kafka error codes which seem to relate to a specific topic. + + This uses a list extracted from all documented error codes up to confluent_kafka v2.4, + but some of these errors did not exist or were not exposed in earlier versions. + To maintain backward compatibility, this function checks whether each error exists before + attempting to otherwise refer to it. + """ + err_names = [ + "_UNKNOWN_TOPIC", + "_NO_OFFSET", + "_LOG_TRUNCATION", + "OFFSET_OUT_OF_RANGE", + "UNKNOWN_TOPIC_OR_PART", + "NOT_LEADER_FOR_PARTITION", + "TOPIC_EXCEPTION", + "NOT_ENOUGH_REPLICAS", + "NOT_ENOUGH_REPLICAS_AFTER_APPEND", + "INVALID_COMMIT_OFFSET_SIZE", + "TOPIC_AUTHORIZATION_FAILED", + "TOPIC_ALREADY_EXISTS", + "INVALID_PARTITIONS", + "INVALID_REPLICATION_FACTOR", + "INVALID_REPLICA_ASSIGNMENT", + "REASSIGNMENT_IN_PROGRESS", + "TOPIC_DELETION_DISABLED", + "OFFSET_NOT_AVAILABLE", + "PREFERRED_LEADER_NOT_AVAILABLE", + "NO_REASSIGNMENT_IN_PROGRESS", + "GROUP_SUBSCRIBED_TO_TOPIC", + "UNSTABLE_OFFSET_COMMIT", + "UNKNOWN_TOPIC_ID", + ] + errors = set() + for name in err_names: + if hasattr(confluent_kafka.KafkaError, name): + errors.add(getattr(confluent_kafka.KafkaError, name)) + else: + logger.debug(f"{name} does not exist in confluent_kafka version " + f"{confluent_kafka.__version__} ({confluent_kafka.libversion()})") + return errors class KafkaException(Exception): @classmethod - def from_kafka_error(cls, error): - return cls(error) + def from_kafka_error(cls, error, msg=None): + return cls(error, msg) + + topic_related_errors = _get_topic_related_errors() - def __init__(self, error): + def __init__(self, error, msg=None): self.error = error self.name = error.name() self.reason = error.str() self.retriable = error.retriable() self.fatal = error.fatal() - msg = f"Error communicating with Kafka: code={self.name} {self.reason}" - super(KafkaException, self).__init__(msg) + self.message = msg + ex_msg = f"Error communicating with Kafka: code={self.name} {self.reason}" + if msg and error.code() in KafkaException.topic_related_errors: + ex_msg += f" on topic {msg.topic()}" + super(KafkaException, self).__init__(ex_msg) From 5b6e5a7865ecad9d389fcf02b947a14969bd8343 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Mon, 10 Jun 2024 16:21:17 -0400 Subject: [PATCH 5/5] Correct typo --- adc/producer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adc/producer.py b/adc/producer.py index 81c4d35..7510fb7 100644 --- a/adc/producer.py +++ b/adc/producer.py @@ -43,7 +43,7 @@ def write(self, topic = self.conf.topic else: raise Exception("No topic specified for write: " - "Either configure a topic when consturcting the Producer, " + "Either configure a topic when constructing the Producer, " "or specify the topic argument to write()") self.logger.debug("writing message to %s", topic) if delivery_callback is not None: