Skip to content

Commit

Permalink
add version to RM API
Browse files Browse the repository at this point in the history
  • Loading branch information
slicklash committed Aug 24, 2023
1 parent 953e8f1 commit 0b3fc49
Showing 1 changed file with 11 additions and 0 deletions.
11 changes: 11 additions & 0 deletions granulate_utils/metrics/yarn.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
# Licensed under a 3-clause BSD style license (see LICENSE.bsd3).
#
import logging
from functools import cached_property
from typing import Dict, Iterable, List, Optional

from packaging.version import Version

from granulate_utils.metrics import Collector, Sample, json_request, samples_from_json
from granulate_utils.metrics.metrics import YARN_CLUSTER_METRICS, YARN_NODES_METRICS

Expand All @@ -20,6 +23,7 @@ def __init__(self, rm_address: str):
self._metrics_url = f"{rm_address}/ws/v1/cluster/metrics"
self._nodes_url = f"{rm_address}/ws/v1/cluster/nodes"
self._scheduler_url = f"{rm_address}/ws/v1/cluster/scheduler"
self._info_url = f"{rm_address}/ws/v1/cluster/info"

def apps(self, **kwargs) -> List[Dict]:
apps = json_request(self._apps_url, {}, **kwargs).get("apps") or {}
Expand All @@ -36,6 +40,13 @@ def scheduler(self, **kwargs) -> Optional[Dict]:
scheduler = json_request(self._scheduler_url, {}, **kwargs).get("scheduler") or {}
return scheduler.get("schedulerInfo")

@cached_property
def version(self) -> Version:
return Version(json_request(self._info_url, {})["clusterInfo"]["resourceManagerVersion"])

def is_version_at_least(self, version: str) -> bool:
return self.version >= Version(version)


class YarnCollector(Collector):
name = "yarn"
Expand Down

0 comments on commit 0b3fc49

Please sign in to comment.