Skip to content

Commit

Permalink
BigDataSampler: use get_yarn_node_info for YARN detection
Browse files Browse the repository at this point in the history
  • Loading branch information
slicklash committed Aug 31, 2023
1 parent 7fc0b30 commit c050dff
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 88 deletions.
140 changes: 65 additions & 75 deletions granulate_utils/metrics/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import os
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import List, Optional, Tuple
from xml.etree import ElementTree as ET
from typing import Dict, List, Optional, Tuple

import psutil

Expand All @@ -20,7 +19,8 @@
from granulate_utils.metrics import Collector, MetricsSnapshot, Sample
from granulate_utils.metrics.mode import SPARK_MESOS_MODE, SPARK_STANDALONE_MODE, SPARK_YARN_MODE
from granulate_utils.metrics.spark import SparkApplicationMetricsCollector
from granulate_utils.metrics.yarn import YarnCollector
from granulate_utils.metrics.yarn import YarnCollector, YarnNodeInfo, get_yarn_node_info
from granulate_utils.metrics.yarn.utils import parse_config_xml

FIND_CLUSTER_TIMEOUT_SECS = 10 * 60

Expand Down Expand Up @@ -67,6 +67,7 @@ def __init__(
self._collectors: List[Collector] = []
self._master_address: Optional[str] = None
self._cluster_mode: Optional[str] = None
self._yarn_node_info: Optional[YarnNodeInfo] = None

assert (cluster_mode is None) == (
master_address is None
Expand All @@ -87,32 +88,18 @@ def _get_yarn_config_path(self, process: psutil.Process) -> str:
self._logger.info("Could not find HADOOP_CONF_DIR variable, using default path", hadoop_conf_dir=path)
return os.path.join(path, "yarn-site.xml")

def _get_yarn_config(self, process: psutil.Process) -> Optional[ET.Element]:
def _get_yarn_config(self, process: psutil.Process) -> Optional[Dict[str, str]]:
config_path = self._get_yarn_config_path(process)

self._logger.debug("Trying to open yarn config file for reading", config_path=config_path)
try:
# resolve config path against process' filesystem root
process_relative_config_path = resolve_host_path(process, self._get_yarn_config_path(process))
with open(process_relative_config_path, "rb") as conf_file:
config_xml_string = conf_file.read()
return ET.fromstring(config_xml_string)
with open(process_relative_config_path, "r") as conf_file:
return parse_config_xml(conf_file.read())
except FileNotFoundError:
return None

def _get_yarn_config_property(
self, process: psutil.Process, requested_property: str, default: Optional[str] = None
) -> Optional[str]:
config = self._get_yarn_config(process)
if config is not None:
for config_property in config.iter("property"):
name_property = config_property.find("name")
if name_property is not None and name_property.text == requested_property:
value_property = config_property.find("value")
if value_property is not None:
return value_property.text
return default

def _guess_standalone_master_webapp_address(self, process: psutil.Process) -> str:
"""
Selects the master address for a standalone cluster.
Expand All @@ -131,47 +118,14 @@ def _get_master_process_arg_value(self, process: psutil.Process, arg_name: str)
self._logger.exception("Could not find value for argument", arg_name=arg_name)
return None

def _guess_yarn_resource_manager_webapp_address(self, resource_manager_process: psutil.Process) -> str:
config = self._get_yarn_config(resource_manager_process)

if config is not None:
for config_property in config.iter("property"):
name_property = config_property.find("name")
if (
name_property is not None
and name_property.text is not None
and name_property.text.startswith("yarn.resourcemanager.webapp.address")
):
value_property = config_property.find("value")
if value_property is not None and value_property.text is not None:
return value_property.text

host_name = self._get_yarn_host_name(resource_manager_process)
return host_name + ":8088"

def _guess_mesos_master_webapp_address(self, process: psutil.Process) -> str:
"""
Selects the master address for a mesos-master running on this node. Uses master_address if given, or defaults
to my hostname.
"""
return self._hostname + ":5050"

def _get_yarn_host_name(self, resource_manager_process: psutil.Process) -> str:
"""
Selects the master adderss for a ResourceManager running on this node - this parses the YARN config to
get the hostname, and if not found, defaults to my hostname.
"""
hostname = self._get_yarn_config_property(resource_manager_process, "yarn.resourcemanager.hostname")
if hostname is not None:
self._logger.debug(
"Selected hostname from yarn.resourcemanager.hostname config", resourcemanager_hostname=hostname
)
else:
hostname = self._hostname
self._logger.debug("Selected hostname from my hostname", resourcemanager_hostname=hostname)
return hostname

def _is_yarn_master_collector(self, resource_manager_process: psutil.Process) -> bool:
def _is_yarn_master_collector(self) -> bool:
"""
yarn lists the addresses of the other masters in order communicate with
other masters, so we can choose one of them (like rm1) and run the
Expand All @@ -187,27 +141,31 @@ def _is_yarn_master_collector(self, resource_manager_process: psutil.Process) ->
'rm1 = hn0-nrt-hb.3e3rqto3nr5evmsjbqz0pkrj4g.tx.internal.cloudapp.net:8050'
where the hostname is 'hn0-nrt-hb.3e3rqto3nr5evmsjbqz0pkrj4g'
"""
rm1_address = self._get_yarn_config_property(resource_manager_process, "yarn.resourcemanager.address.rm1", None)
host_name = self._get_yarn_host_name(resource_manager_process)
if self._yarn_node_info is None or self._yarn_node_info.resource_manager_index is None:
return False

if rm1_address is None:
rm_addresses = self._yarn_node_info.resource_manager_webapp_addresses
rm1_address = rm_addresses[0]

if len(rm_addresses) == 1:
self._logger.info(
"yarn.resourcemanager.address.rm1 is not defined in config, so it's a single master deployment,"
" enabling Spark collector"
)
return True
elif rm1_address.startswith(host_name):

if self._yarn_node_info.is_first_resource_manager:
self._logger.info(
f"This is the collector master, because rm1: {rm1_address!r}"
f" starts with my host name: {host_name!r}, enabling Spark collector"
f" starts with my host name: {self._hostname!r}, enabling Spark collector"
)
return True
else:
self._logger.info(
f"This is not the collector master, because rm1: {rm1_address!r}"
f" does not start with my host name: {host_name!r}, skipping Spark collector on this YARN master"
)
return False

self._logger.info(
f"This is not the collector master, because rm1: {rm1_address!r}"
f" does not start with my host name: {self._hostname!r}, skipping Spark collector on this YARN master"
)
return False

def _get_spark_manager_process(self) -> Optional[psutil.Process]:
def is_master_process(process: psutil.Process) -> bool:
Expand Down Expand Up @@ -239,10 +197,15 @@ def _guess_cluster_mode(self) -> Optional[Tuple[str, str]]:
return None

if "org.apache.hadoop.yarn.server.resourcemanager.ResourceManager" in spark_master_process.cmdline():
if not self._is_yarn_master_collector(spark_master_process):
if (yarn_config := self._get_yarn_config(spark_master_process)) is None:
return None
self._yarn_node_info = get_yarn_node_info(logger=self._logger, yarn_config=yarn_config)
if self._yarn_node_info is None:
return None
if not self._is_yarn_master_collector():
return None
spark_cluster_mode = SPARK_YARN_MODE
webapp_url = self._guess_yarn_resource_manager_webapp_address(spark_master_process)
webapp_url = self._yarn_node_info.resource_manager_webapp_addresses[0]
elif "org.apache.spark.deploy.master.Master" in spark_master_process.cmdline():
spark_cluster_mode = SPARK_STANDALONE_MODE
webapp_url = self._guess_standalone_master_webapp_address(spark_master_process)
Expand Down Expand Up @@ -274,6 +237,31 @@ def _init_collectors(self):
SparkApplicationMetricsCollector(self._cluster_mode, self._master_address, self._logger)
)

def _validate_manual_configuration(self) -> bool:
"""
Validates the manual configuration of master_address and cluster_mode.
"""
if self._cluster_mode == SPARK_YARN_MODE and self._yarn_node_info is None:
self._yarn_node_info = get_yarn_node_info(logger=self._logger)
if self._yarn_node_info is None:
self._logger.debug("YARN not detectedr")
return False
if not self._yarn_node_info.is_resource_manager:
self._logger.debug("This is not a YARN ResourceManager node")
return False
if not self._yarn_node_info.is_first_resource_manager:
self._logger.debug("This is not the first YARN ResourceManager node")
return False
rm1_address = self._yarn_node_info.resource_manager_webapp_addresses[0]
if self._master_address != rm1_address:
self._logger.debug(
f"YARN ResourceManager address {rm1_address!r} does not match"
f" manually configured address {self._master_address!r}"
)
return False

return True

def discover(self) -> bool:
"""
Discovers:
Expand All @@ -287,14 +275,16 @@ def discover(self) -> bool:
have_conf = False

if self._master_address is not None and self._cluster_mode is not None:
# No need to guess, manually configured
self._logger.debug(
"No need to guess cluster mode and master address, manually configured",
cluster_mode=self._cluster_mode,
master_address=self._master_address,
)
have_conf = True

if self._validate_manual_configuration():
# No need to guess, manually configured
self._logger.debug(
"No need to guess cluster mode and master address, manually configured",
cluster_mode=self._cluster_mode,
master_address=self._master_address,
)
have_conf = True
else:
self._logger.error("Manually configured cluster mode and master address are invalid, skipping sampler")
else:
cluster_conf = self._guess_cluster_mode()
if cluster_conf is not None:
Expand Down
7 changes: 2 additions & 5 deletions granulate_utils/metrics/yarn/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from granulate_utils.metrics.yarn.metrics_collector import YarnCollector
from granulate_utils.metrics.yarn.resource_manager import YARN_RM_CLASSNAME, ResourceManagerAPI
from granulate_utils.metrics.yarn.utils import YarnNodeInfo, get_yarn_node_info

__all__ = [
"YARN_RM_CLASSNAME",
"ResourceManagerAPI",
"YarnCollector",
]
__all__ = ["YARN_RM_CLASSNAME", "ResourceManagerAPI", "YarnCollector", "YarnNodeInfo", "get_yarn_node_info"]
23 changes: 15 additions & 8 deletions granulate_utils/metrics/yarn/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,28 @@ def read_config_file(
Read YARN config from file
"""
try:
result = {}
with open(xml_file, "r") as f:
logger.debug(f"reading {xml_file}")
root = ET.fromstring(f.read())
for p in root.findall("./property"):
name = p.find("name")
value = p.find("value")
if name is not None and value is not None and (key := name.text):
result[key] = value.text or ""
return result
return parse_config_xml(f.read())
except FileNotFoundError:
logger.error(f"file not found: {xml_file}")
return None


def parse_config_xml(xml: str) -> Dict[str, str]:
"""
Parse YARN config from XML string
"""
root = ET.fromstring(xml)
result = {}
for p in root.findall("./property"):
name = p.find("name")
value = p.find("value")
if name is not None and value is not None and (key := name.text):
result[key] = value.text or ""
return result


def get_all_rm_addresses(
yarn_config: Dict[str, Any], *, logger: Union[logging.Logger, logging.LoggerAdapter]
) -> List[str]:
Expand Down

0 comments on commit c050dff

Please sign in to comment.