From 0b3fc4961fcc86f4584e8cdb781ef12f1706ec93 Mon Sep 17 00:00:00 2001 From: slicklash Date: Thu, 24 Aug 2023 14:08:51 +0300 Subject: [PATCH] add version to RM API --- granulate_utils/metrics/yarn.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/granulate_utils/metrics/yarn.py b/granulate_utils/metrics/yarn.py index b5c2ad32..4fda4ff6 100644 --- a/granulate_utils/metrics/yarn.py +++ b/granulate_utils/metrics/yarn.py @@ -6,8 +6,11 @@ # Licensed under a 3-clause BSD style license (see LICENSE.bsd3). # import logging +from functools import cached_property from typing import Dict, Iterable, List, Optional +from packaging.version import Version + from granulate_utils.metrics import Collector, Sample, json_request, samples_from_json from granulate_utils.metrics.metrics import YARN_CLUSTER_METRICS, YARN_NODES_METRICS @@ -20,6 +23,7 @@ def __init__(self, rm_address: str): self._metrics_url = f"{rm_address}/ws/v1/cluster/metrics" self._nodes_url = f"{rm_address}/ws/v1/cluster/nodes" self._scheduler_url = f"{rm_address}/ws/v1/cluster/scheduler" + self._info_url = f"{rm_address}/ws/v1/cluster/info" def apps(self, **kwargs) -> List[Dict]: apps = json_request(self._apps_url, {}, **kwargs).get("apps") or {} @@ -36,6 +40,13 @@ def scheduler(self, **kwargs) -> Optional[Dict]: scheduler = json_request(self._scheduler_url, {}, **kwargs).get("scheduler") or {} return scheduler.get("schedulerInfo") + @cached_property + def version(self) -> Version: + return Version(json_request(self._info_url, {})["clusterInfo"]["resourceManagerVersion"]) + + def is_version_at_least(self, version: str) -> bool: + return self.version >= Version(version) + class YarnCollector(Collector): name = "yarn"