Skip to content

Commit

Permalink
Add ability to cache offsets and close admin client
Browse files Browse the repository at this point in the history
  • Loading branch information
yzhan289 committed Oct 4, 2023
1 parent e74a57b commit 1568aed
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
20 changes: 13 additions & 7 deletions kafka_consumer/datadog_checks/kafka_consumer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(self, config, tls_context, log) -> None:
self.log = log
self._kafka_client = None
self._tls_context = tls_context
self.topics_cache = {}

@property
def kafka_client(self):
Expand All @@ -35,6 +36,7 @@ def __create_consumer(self, consumer_group):
"bootstrap.servers": self.config._kafka_connect_str,
"group.id": consumer_group,
"enable.auto.commit": False, # To avoid offset commit to broker during close
"queued.max.messages.kbytes": 1024, # https://github.com/confluentinc/confluent-kafka-python/issues/759
}
config.update(self.__get_authentication_config())

Expand Down Expand Up @@ -152,6 +154,9 @@ def get_highwater_offsets(self, consumer_offsets):
return highwater_offsets

def get_partitions_for_topic(self, topic):
if self.topics_cache.get(topic):
return self.topics_cache.get(topic)

try:
cluster_metadata = self.kafka_client.list_topics(topic, timeout=self.config._request_timeout)
except KafkaException as e:
Expand All @@ -160,6 +165,7 @@ def get_partitions_for_topic(self, topic):
else:
topic_metadata = cluster_metadata.topics[topic]
partitions = list(topic_metadata.partitions.keys())
self.topics_cache[topic] = partitions
return partitions

def request_metadata_update(self):
Expand All @@ -175,7 +181,7 @@ def get_consumer_offsets(self):
self.log.debug('Identified %s consumer groups', len(consumer_groups))

futures = self._get_consumer_offset_futures(consumer_groups)
self.log.debug('%s futures to be waited on', len(futures))
self.log.debug('%s futures to be waited on', len(futures)) # 2023-09-26 13:37:22 UTC

for future in as_completed(futures):
try:
Expand Down Expand Up @@ -217,7 +223,7 @@ def get_consumer_offsets(self):
if self.config._consumer_groups_compiled_regex.match(to_match):
consumer_offsets[(consumer_group, topic, partition)] = offset

self.log.debug('Got %s consumer offsets', len(consumer_offsets))
self.log.debug('Got %s consumer offsets', len(consumer_offsets)) # 13:37:23
return consumer_offsets

def _get_consumer_groups(self):
Expand All @@ -240,6 +246,9 @@ def _get_consumer_groups(self):
def _list_consumer_group_offsets(self, cg_tp):
return self.kafka_client.list_consumer_group_offsets([cg_tp])

def close_admin_client(self):
self._kafka_client = None

def _get_consumer_offset_futures(self, consumer_groups):
futures = []

Expand Down Expand Up @@ -267,11 +276,8 @@ def _get_consumer_offset_futures(self, consumer_groups):
# If partitions are not defined
else:
# get all the partitions for this topic
partitions = (
self.kafka_client.list_topics(topic=topic, timeout=self.config._request_timeout)
.topics[topic]
.partitions
)
partitions = self.get_partitions_for_topic(topic)

topic_partitions = [TopicPartition(topic, partition) for partition in partitions]

futures.append(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, c
self.log.warning(msg, consumer_group, topic, partition)
self.client.request_metadata_update() # force metadata update on next poll()
self.log.debug('%s consumer offsets reported', reported_contexts)
self.client.close_admin_client()

def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'):
"""Emit an event to the Datadog Event Stream."""
Expand Down

0 comments on commit 1568aed

Please sign in to comment.