diff --git a/kafka_consumer/CHANGELOG.md b/kafka_consumer/CHANGELOG.md index 237c23cf55953..aaa19da79349b 100644 --- a/kafka_consumer/CHANGELOG.md +++ b/kafka_consumer/CHANGELOG.md @@ -2,6 +2,12 @@ ## Unreleased +## 4.1.3 / 2023-10-11 + +***Fixed***: + +* Add ability to cache offsets and close admin client ([#15988](https://github.com/DataDog/integrations-core/pull/15988)) + ## 4.1.2 / 2023-09-04 / Agent 7.48.0 ***Fixed***: diff --git a/kafka_consumer/assets/configuration/spec.yaml b/kafka_consumer/assets/configuration/spec.yaml index 8b253d5f786dc..491b019b7005d 100644 --- a/kafka_consumer/assets/configuration/spec.yaml +++ b/kafka_consumer/assets/configuration/spec.yaml @@ -104,6 +104,24 @@ files: value: type: boolean example: false + - name: consumer_queued_max_messages_kbytes + description: | + The consumer very aggressively caches messages in the background (tuned for very high throughput). + To reduce memory usage, tune down queued.max.messages.kbytes (maximum cache size per partition). + Override the kafka default to 1MB for the integration check to optimize + memory consumption to avoid potential out of memory (OOM) kill. (Default setting is 1GB per Kafka client) + value: + type: integer + example: 1024 + - name: close_admin_client + description: | + Release AdminClient at the end of execution for garbage collection. Originally, we kept the same AdminClient + running over the entire life of the check. By deallocating the AdminClient after the check run, we should free + up any memory used in the client, although the performance of the check run would be slower (since we will + need to reconnect with a new client). Set this config option to false to improve performance. + value: + type: boolean + example: true - name: monitor_all_broker_highwatermarks description: | Setting monitor_all_broker_highwatermarks to `true` tells the check to diff --git a/kafka_consumer/datadog_checks/kafka_consumer/__about__.py b/kafka_consumer/datadog_checks/kafka_consumer/__about__.py index a01c729f04f9f..d5a6a0685213c 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/__about__.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/__about__.py @@ -2,4 +2,4 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) -__version__ = "4.1.2" +__version__ = "4.1.3" diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client.py b/kafka_consumer/datadog_checks/kafka_consumer/client.py index 68ca6b231727f..7d436617fdfe0 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client.py @@ -14,6 +14,7 @@ def __init__(self, config, log) -> None: self.config = config self.log = log self._kafka_client = None + self.topic_partition_cache = {} @property def kafka_client(self): @@ -34,6 +35,7 @@ def __create_consumer(self, consumer_group): "bootstrap.servers": self.config._kafka_connect_str, "group.id": consumer_group, "enable.auto.commit": False, # To avoid offset commit to broker during close + "queued.max.messages.kbytes": self.config._consumer_queued_max_messages_kbytes, } config.update(self.__get_authentication_config()) @@ -152,6 +154,9 @@ def get_highwater_offsets(self, consumer_offsets): return highwater_offsets def get_partitions_for_topic(self, topic): + if partitions := self.topic_partition_cache.get(topic): + return partitions + try: cluster_metadata = self.kafka_client.list_topics(topic, timeout=self.config._request_timeout) except KafkaException as e: @@ -160,6 +165,7 @@ def get_partitions_for_topic(self, topic): else: topic_metadata = cluster_metadata.topics[topic] partitions = list(topic_metadata.partitions.keys()) + self.topic_partition_cache[topic] = partitions return partitions def request_metadata_update(self): @@ -244,6 +250,9 @@ def _get_consumer_groups(self): def _list_consumer_group_offsets(self, cg_tp): return self.kafka_client.list_consumer_group_offsets([cg_tp]) + def close_admin_client(self): + self._kafka_client = None + def _get_consumer_offset_futures(self, consumer_groups): futures = [] @@ -271,11 +280,8 @@ def _get_consumer_offset_futures(self, consumer_groups): # If partitions are not defined else: # get all the partitions for this topic - partitions = ( - self.kafka_client.list_topics(topic=topic, timeout=self.config._request_timeout) - .topics[topic] - .partitions - ) + partitions = self.get_partitions_for_topic(topic) + topic_partitions = [TopicPartition(topic, partition) for partition in partitions] futures.append( diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config.py b/kafka_consumer/datadog_checks/kafka_consumer/config.py index 7041125e3256e..4616f02310496 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/config.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/config.py @@ -27,6 +27,10 @@ def __init__(self, init_config, instance, log) -> None: if self._consumer_groups_regex else "" ) + # Optimization to avoid OOM kill: + # https://github.com/confluentinc/confluent-kafka-python/issues/759 + self._consumer_queued_max_messages_kbytes = instance.get('consumer_queued_max_messages_kbytes', 1024) + self._close_admin_client = instance.get('close_admin_client', True) self._kafka_connect_str = instance.get('kafka_connect_str') self._kafka_version = instance.get('kafka_client_api_version') diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py b/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py index 5ee10d396c6b3..c2afdebfd6450 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py @@ -12,6 +12,14 @@ def shared_kafka_timeout(): return 5 +def instance_close_admin_client(): + return True + + +def instance_consumer_queued_max_messages_kbytes(): + return 1024 + + def instance_disable_generic_tags(): return False diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py b/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py index 793d1dcb4eaa0..413aedbbd6240 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py @@ -45,8 +45,10 @@ class InstanceConfig(BaseModel): arbitrary_types_allowed=True, frozen=True, ) + close_admin_client: Optional[bool] = None consumer_groups: Optional[MappingProxyType[str, Any]] = None consumer_groups_regex: Optional[MappingProxyType[str, Any]] = None + consumer_queued_max_messages_kbytes: Optional[int] = None disable_generic_tags: Optional[bool] = None empty_default_hostname: Optional[bool] = None kafka_client_api_version: Optional[str] = None diff --git a/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example index 78f0eb61c8a6a..5a6214ca01acd 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example +++ b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example @@ -104,6 +104,22 @@ instances: # # monitor_unlisted_consumer_groups: false + ## @param consumer_queued_max_messages_kbytes - integer - optional - default: 1024 + ## The consumer very aggressively caches messages in the background (tuned for very high throughput). + ## To reduce memory usage, tune down queued.max.messages.kbytes (maximum cache size per partition). + ## Override the kafka default to 1MB for the integration check to optimize + ## memory consumption to avoid potential out of memory (OOM) kill. (Default setting is 1GB per Kafka client) + # + # consumer_queued_max_messages_kbytes: 1024 + + ## @param close_admin_client - boolean - optional - default: true + ## Release AdminClient at the end of execution for garbage collection. Originally, we kept the same AdminClient + ## running over the entire life of the check. By deallocating the AdminClient after the check run, we should free + ## up any memory used in the client, although the performance of the check run would be slower (since we will + ## need to reconnect with a new client). Set this config option to false to improve performance. + # + # close_admin_client: true + ## @param monitor_all_broker_highwatermarks - boolean - optional - default: false ## Setting monitor_all_broker_highwatermarks to `true` tells the check to ## discover and fetch the broker highwater mark offsets for all kafka topics in diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 188b4900fc08c..726868771447b 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -45,6 +45,8 @@ def check(self, _): except Exception: self.log.exception("There was a problem collecting the highwater mark offsets.") # Unlike consumer offsets, fail immediately because we can't calculate consumer lag w/o highwater_offsets + if self.config._close_admin_client: + self.client.close_admin_client() raise total_contexts = len(consumer_offsets) + len(highwater_offsets) @@ -67,6 +69,8 @@ def check(self, _): self.report_consumer_offsets_and_lag( consumer_offsets, highwater_offsets, self._context_limit - len(highwater_offsets) ) + if self.config._close_admin_client: + self.client.close_admin_client() def report_highwater_offsets(self, highwater_offsets, contexts_limit): """Report the broker highwater offsets.""" diff --git a/requirements-agent-release.txt b/requirements-agent-release.txt index 57608326f0431..792159e2a8126 100644 --- a/requirements-agent-release.txt +++ b/requirements-agent-release.txt @@ -79,7 +79,7 @@ datadog-impala==2.0.0 datadog-istio==5.1.0 datadog-jboss-wildfly==2.1.1 datadog-journald==1.1.1 -datadog-kafka-consumer==4.1.2 +datadog-kafka-consumer==4.1.3 datadog-kafka==2.14.1 datadog-kong==3.0.0 datadog-kube-apiserver-metrics==4.0.0