Skip to content

Commit

Permalink
Merge branch 'master' into feature/config-feeder-autoscaling
Browse files Browse the repository at this point in the history
  • Loading branch information
slicklash committed Aug 25, 2023
2 parents c1bfc3e + 149d257 commit b9a9b4b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
4 changes: 2 additions & 2 deletions granulate_utils/config_feeder/client/yarn/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ async def _find_yarn_home_dir(*, logger: Union[logging.Logger, logging.LoggerAda
lines = stdout.decode().split(" -D")
home_dir_key = "yarn.home.dir="
for line in lines:
if line.startswith(home_dir_key):
return line[len(home_dir_key) :].strip()
if line.startswith(home_dir_key) and (home_dir := line[len(home_dir_key) :].strip()):
return home_dir
logger.error("no YARN processes found")
return None

Expand Down
45 changes: 36 additions & 9 deletions granulate_utils/metrics/yarn.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
# Licensed under a 3-clause BSD style license (see LICENSE.bsd3).
#
import logging
from functools import cached_property
from typing import Dict, Iterable, List, Optional

from packaging.version import Version

from granulate_utils.metrics import Collector, Sample, json_request, samples_from_json
from granulate_utils.metrics.metrics import YARN_CLUSTER_METRICS, YARN_NODES_METRICS

Expand All @@ -20,18 +23,29 @@ def __init__(self, rm_address: str):
self._metrics_url = f"{rm_address}/ws/v1/cluster/metrics"
self._nodes_url = f"{rm_address}/ws/v1/cluster/nodes"
self._scheduler_url = f"{rm_address}/ws/v1/cluster/scheduler"
self._info_url = f"{rm_address}/ws/v1/cluster/info"

def apps(self, **kwargs) -> List[Dict]:
return json_request(self._apps_url, {}, **kwargs).get("apps", {}).get("app", [])
apps = json_request(self._apps_url, {}, **kwargs).get("apps") or {}
return apps.get("app", [])

def metrics(self, **kwargs) -> Optional[Dict]:
return json_request(self._metrics_url, {}, **kwargs).get("clusterMetrics")

def nodes(self, **kwargs) -> List[Dict]:
return json_request(self._nodes_url, {}, **kwargs).get("nodes", {}).get("node", [])
nodes = json_request(self._nodes_url, {}, **kwargs).get("nodes") or {}
return nodes.get("node", [])

def scheduler(self, **kwargs) -> Optional[Dict]:
return json_request(self._scheduler_url, {}, **kwargs).get("scheduler", {}).get("schedulerInfo")
scheduler = json_request(self._scheduler_url, {}, **kwargs).get("scheduler") or {}
return scheduler.get("schedulerInfo")

@cached_property
def version(self) -> Version:
return Version(json_request(self._info_url, {})["clusterInfo"]["resourceManagerVersion"])

def is_version_at_least(self, version: str) -> bool:
return self.version >= Version(version)


class YarnCollector(Collector):
Expand All @@ -58,16 +72,29 @@ def _cluster_metrics(self) -> Iterable[Sample]:

def _nodes_metrics(self) -> Iterable[Sample]:
try:
# This are all the statuses that defined as 'active node' in:
# isActiveState in
# https://github.com/apache/hadoop/blob/a91933620d8755e80ad4bdf900b506dd73d26786/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java#L65
# Also, we don't want to collect DECOMMISSIONED because in EMR,
# nodes are considered DECOMMISSIONED forever and are never removed from the nodes list
for node in self.rm.nodes(states="NEW,RUNNING,UNHEALTHY,DECOMMISSIONING"):
for node in self.rm.nodes(states=self._active_node_states):
for metric, value in node.get("resourceUtilization", {}).items():
node[metric] = value # this will create all relevant metrics under same dictionary

labels = {"node_hostname": node["nodeHostName"]}
yield from samples_from_json(labels, node, YARN_NODES_METRICS)
except Exception:
self.logger.exception("Could not gather yarn nodes metrics")

@cached_property
def _active_node_states(self) -> str:
"""
Returns all the states that are considered 'active' for a node.
Taken from isActiveState in:
https://github.com/apache/hadoop/blob/a91933620d8755e80ad4bdf900b506dd73d26786/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java#L65
Also, we don't want to collect DECOMMISSIONED because in EMR nodes are
considered DECOMMISSIONED forever and are never removed from the nodes list
"""

# DECOMMISSIONING was added in 2.8.0
if self.rm.is_version_at_least("2.8.0"):
return "NEW,RUNNING,UNHEALTHY,DECOMMISSIONING"
else:
return "NEW,RUNNING,UNHEALTHY"

0 comments on commit b9a9b4b

Please sign in to comment.