diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client.py b/kafka_consumer/datadog_checks/kafka_consumer/client.py index 404d5e7440627..9c5886e1691be 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client.py @@ -15,6 +15,7 @@ def __init__(self, config, tls_context, log) -> None: self.log = log self._kafka_client = None self._tls_context = tls_context + self.topics_cache = {} @property def kafka_client(self): @@ -35,6 +36,7 @@ def __create_consumer(self, consumer_group): "bootstrap.servers": self.config._kafka_connect_str, "group.id": consumer_group, "enable.auto.commit": False, # To avoid offset commit to broker during close + "queued.max.messages.kbytes": 1024, # https://github.com/confluentinc/confluent-kafka-python/issues/759 } config.update(self.__get_authentication_config()) @@ -152,6 +154,9 @@ def get_highwater_offsets(self, consumer_offsets): return highwater_offsets def get_partitions_for_topic(self, topic): + if self.topics_cache.get(topic): + return self.topics_cache.get(topic) + try: cluster_metadata = self.kafka_client.list_topics(topic, timeout=self.config._request_timeout) except KafkaException as e: @@ -160,6 +165,7 @@ def get_partitions_for_topic(self, topic): else: topic_metadata = cluster_metadata.topics[topic] partitions = list(topic_metadata.partitions.keys()) + self.topics_cache[topic] = partitions return partitions def request_metadata_update(self): @@ -175,7 +181,7 @@ def get_consumer_offsets(self): self.log.debug('Identified %s consumer groups', len(consumer_groups)) futures = self._get_consumer_offset_futures(consumer_groups) - self.log.debug('%s futures to be waited on', len(futures)) + self.log.debug('%s futures to be waited on', len(futures)) # 2023-09-26 13:37:22 UTC for future in as_completed(futures): try: @@ -217,7 +223,7 @@ def get_consumer_offsets(self): if self.config._consumer_groups_compiled_regex.match(to_match): consumer_offsets[(consumer_group, topic, partition)] = offset - self.log.debug('Got %s consumer offsets', len(consumer_offsets)) + self.log.debug('Got %s consumer offsets', len(consumer_offsets)) # 13:37:23 return consumer_offsets def _get_consumer_groups(self): @@ -240,6 +246,9 @@ def _get_consumer_groups(self): def _list_consumer_group_offsets(self, cg_tp): return self.kafka_client.list_consumer_group_offsets([cg_tp]) + def close_admin_client(self): + self._kafka_client = None + def _get_consumer_offset_futures(self, consumer_groups): futures = [] @@ -267,11 +276,8 @@ def _get_consumer_offset_futures(self, consumer_groups): # If partitions are not defined else: # get all the partitions for this topic - partitions = ( - self.kafka_client.list_topics(topic=topic, timeout=self.config._request_timeout) - .topics[topic] - .partitions - ) + partitions = self.get_partitions_for_topic(topic) + topic_partitions = [TopicPartition(topic, partition) for partition in partitions] futures.append( diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 08ebf03d6a8a4..719994fd06928 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -157,6 +157,7 @@ def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, c self.log.warning(msg, consumer_group, topic, partition) self.client.request_metadata_update() # force metadata update on next poll() self.log.debug('%s consumer offsets reported', reported_contexts) + self.client.close_admin_client() def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'): """Emit an event to the Datadog Event Stream."""