From c050dff23445f2ffb8ef035a7c5fbe438fbc6c1a Mon Sep 17 00:00:00 2001 From: slicklash Date: Thu, 31 Aug 2023 14:56:48 +0300 Subject: [PATCH] BigDataSampler: use get_yarn_node_info for YARN detection --- granulate_utils/metrics/sampler.py | 140 +++++++++++------------ granulate_utils/metrics/yarn/__init__.py | 7 +- granulate_utils/metrics/yarn/utils.py | 23 ++-- 3 files changed, 82 insertions(+), 88 deletions(-) diff --git a/granulate_utils/metrics/sampler.py b/granulate_utils/metrics/sampler.py index 1cf9afda..a4eb6989 100644 --- a/granulate_utils/metrics/sampler.py +++ b/granulate_utils/metrics/sampler.py @@ -9,8 +9,7 @@ import os from abc import ABC, abstractmethod from datetime import datetime, timezone -from typing import List, Optional, Tuple -from xml.etree import ElementTree as ET +from typing import Dict, List, Optional, Tuple import psutil @@ -20,7 +19,8 @@ from granulate_utils.metrics import Collector, MetricsSnapshot, Sample from granulate_utils.metrics.mode import SPARK_MESOS_MODE, SPARK_STANDALONE_MODE, SPARK_YARN_MODE from granulate_utils.metrics.spark import SparkApplicationMetricsCollector -from granulate_utils.metrics.yarn import YarnCollector +from granulate_utils.metrics.yarn import YarnCollector, YarnNodeInfo, get_yarn_node_info +from granulate_utils.metrics.yarn.utils import parse_config_xml FIND_CLUSTER_TIMEOUT_SECS = 10 * 60 @@ -67,6 +67,7 @@ def __init__( self._collectors: List[Collector] = [] self._master_address: Optional[str] = None self._cluster_mode: Optional[str] = None + self._yarn_node_info: Optional[YarnNodeInfo] = None assert (cluster_mode is None) == ( master_address is None @@ -87,32 +88,18 @@ def _get_yarn_config_path(self, process: psutil.Process) -> str: self._logger.info("Could not find HADOOP_CONF_DIR variable, using default path", hadoop_conf_dir=path) return os.path.join(path, "yarn-site.xml") - def _get_yarn_config(self, process: psutil.Process) -> Optional[ET.Element]: + def _get_yarn_config(self, process: psutil.Process) -> Optional[Dict[str, str]]: config_path = self._get_yarn_config_path(process) self._logger.debug("Trying to open yarn config file for reading", config_path=config_path) try: # resolve config path against process' filesystem root process_relative_config_path = resolve_host_path(process, self._get_yarn_config_path(process)) - with open(process_relative_config_path, "rb") as conf_file: - config_xml_string = conf_file.read() - return ET.fromstring(config_xml_string) + with open(process_relative_config_path, "r") as conf_file: + return parse_config_xml(conf_file.read()) except FileNotFoundError: return None - def _get_yarn_config_property( - self, process: psutil.Process, requested_property: str, default: Optional[str] = None - ) -> Optional[str]: - config = self._get_yarn_config(process) - if config is not None: - for config_property in config.iter("property"): - name_property = config_property.find("name") - if name_property is not None and name_property.text == requested_property: - value_property = config_property.find("value") - if value_property is not None: - return value_property.text - return default - def _guess_standalone_master_webapp_address(self, process: psutil.Process) -> str: """ Selects the master address for a standalone cluster. @@ -131,24 +118,6 @@ def _get_master_process_arg_value(self, process: psutil.Process, arg_name: str) self._logger.exception("Could not find value for argument", arg_name=arg_name) return None - def _guess_yarn_resource_manager_webapp_address(self, resource_manager_process: psutil.Process) -> str: - config = self._get_yarn_config(resource_manager_process) - - if config is not None: - for config_property in config.iter("property"): - name_property = config_property.find("name") - if ( - name_property is not None - and name_property.text is not None - and name_property.text.startswith("yarn.resourcemanager.webapp.address") - ): - value_property = config_property.find("value") - if value_property is not None and value_property.text is not None: - return value_property.text - - host_name = self._get_yarn_host_name(resource_manager_process) - return host_name + ":8088" - def _guess_mesos_master_webapp_address(self, process: psutil.Process) -> str: """ Selects the master address for a mesos-master running on this node. Uses master_address if given, or defaults @@ -156,22 +125,7 @@ def _guess_mesos_master_webapp_address(self, process: psutil.Process) -> str: """ return self._hostname + ":5050" - def _get_yarn_host_name(self, resource_manager_process: psutil.Process) -> str: - """ - Selects the master adderss for a ResourceManager running on this node - this parses the YARN config to - get the hostname, and if not found, defaults to my hostname. - """ - hostname = self._get_yarn_config_property(resource_manager_process, "yarn.resourcemanager.hostname") - if hostname is not None: - self._logger.debug( - "Selected hostname from yarn.resourcemanager.hostname config", resourcemanager_hostname=hostname - ) - else: - hostname = self._hostname - self._logger.debug("Selected hostname from my hostname", resourcemanager_hostname=hostname) - return hostname - - def _is_yarn_master_collector(self, resource_manager_process: psutil.Process) -> bool: + def _is_yarn_master_collector(self) -> bool: """ yarn lists the addresses of the other masters in order communicate with other masters, so we can choose one of them (like rm1) and run the @@ -187,27 +141,31 @@ def _is_yarn_master_collector(self, resource_manager_process: psutil.Process) -> 'rm1 = hn0-nrt-hb.3e3rqto3nr5evmsjbqz0pkrj4g.tx.internal.cloudapp.net:8050' where the hostname is 'hn0-nrt-hb.3e3rqto3nr5evmsjbqz0pkrj4g' """ - rm1_address = self._get_yarn_config_property(resource_manager_process, "yarn.resourcemanager.address.rm1", None) - host_name = self._get_yarn_host_name(resource_manager_process) + if self._yarn_node_info is None or self._yarn_node_info.resource_manager_index is None: + return False - if rm1_address is None: + rm_addresses = self._yarn_node_info.resource_manager_webapp_addresses + rm1_address = rm_addresses[0] + + if len(rm_addresses) == 1: self._logger.info( "yarn.resourcemanager.address.rm1 is not defined in config, so it's a single master deployment," " enabling Spark collector" ) return True - elif rm1_address.startswith(host_name): + + if self._yarn_node_info.is_first_resource_manager: self._logger.info( f"This is the collector master, because rm1: {rm1_address!r}" - f" starts with my host name: {host_name!r}, enabling Spark collector" + f" starts with my host name: {self._hostname!r}, enabling Spark collector" ) return True - else: - self._logger.info( - f"This is not the collector master, because rm1: {rm1_address!r}" - f" does not start with my host name: {host_name!r}, skipping Spark collector on this YARN master" - ) - return False + + self._logger.info( + f"This is not the collector master, because rm1: {rm1_address!r}" + f" does not start with my host name: {self._hostname!r}, skipping Spark collector on this YARN master" + ) + return False def _get_spark_manager_process(self) -> Optional[psutil.Process]: def is_master_process(process: psutil.Process) -> bool: @@ -239,10 +197,15 @@ def _guess_cluster_mode(self) -> Optional[Tuple[str, str]]: return None if "org.apache.hadoop.yarn.server.resourcemanager.ResourceManager" in spark_master_process.cmdline(): - if not self._is_yarn_master_collector(spark_master_process): + if (yarn_config := self._get_yarn_config(spark_master_process)) is None: + return None + self._yarn_node_info = get_yarn_node_info(logger=self._logger, yarn_config=yarn_config) + if self._yarn_node_info is None: + return None + if not self._is_yarn_master_collector(): return None spark_cluster_mode = SPARK_YARN_MODE - webapp_url = self._guess_yarn_resource_manager_webapp_address(spark_master_process) + webapp_url = self._yarn_node_info.resource_manager_webapp_addresses[0] elif "org.apache.spark.deploy.master.Master" in spark_master_process.cmdline(): spark_cluster_mode = SPARK_STANDALONE_MODE webapp_url = self._guess_standalone_master_webapp_address(spark_master_process) @@ -274,6 +237,31 @@ def _init_collectors(self): SparkApplicationMetricsCollector(self._cluster_mode, self._master_address, self._logger) ) + def _validate_manual_configuration(self) -> bool: + """ + Validates the manual configuration of master_address and cluster_mode. + """ + if self._cluster_mode == SPARK_YARN_MODE and self._yarn_node_info is None: + self._yarn_node_info = get_yarn_node_info(logger=self._logger) + if self._yarn_node_info is None: + self._logger.debug("YARN not detectedr") + return False + if not self._yarn_node_info.is_resource_manager: + self._logger.debug("This is not a YARN ResourceManager node") + return False + if not self._yarn_node_info.is_first_resource_manager: + self._logger.debug("This is not the first YARN ResourceManager node") + return False + rm1_address = self._yarn_node_info.resource_manager_webapp_addresses[0] + if self._master_address != rm1_address: + self._logger.debug( + f"YARN ResourceManager address {rm1_address!r} does not match" + f" manually configured address {self._master_address!r}" + ) + return False + + return True + def discover(self) -> bool: """ Discovers: @@ -287,14 +275,16 @@ def discover(self) -> bool: have_conf = False if self._master_address is not None and self._cluster_mode is not None: - # No need to guess, manually configured - self._logger.debug( - "No need to guess cluster mode and master address, manually configured", - cluster_mode=self._cluster_mode, - master_address=self._master_address, - ) - have_conf = True - + if self._validate_manual_configuration(): + # No need to guess, manually configured + self._logger.debug( + "No need to guess cluster mode and master address, manually configured", + cluster_mode=self._cluster_mode, + master_address=self._master_address, + ) + have_conf = True + else: + self._logger.error("Manually configured cluster mode and master address are invalid, skipping sampler") else: cluster_conf = self._guess_cluster_mode() if cluster_conf is not None: diff --git a/granulate_utils/metrics/yarn/__init__.py b/granulate_utils/metrics/yarn/__init__.py index 6d09def2..722cc594 100644 --- a/granulate_utils/metrics/yarn/__init__.py +++ b/granulate_utils/metrics/yarn/__init__.py @@ -1,8 +1,5 @@ from granulate_utils.metrics.yarn.metrics_collector import YarnCollector from granulate_utils.metrics.yarn.resource_manager import YARN_RM_CLASSNAME, ResourceManagerAPI +from granulate_utils.metrics.yarn.utils import YarnNodeInfo, get_yarn_node_info -__all__ = [ - "YARN_RM_CLASSNAME", - "ResourceManagerAPI", - "YarnCollector", -] +__all__ = ["YARN_RM_CLASSNAME", "ResourceManagerAPI", "YarnCollector", "YarnNodeInfo", "get_yarn_node_info"] diff --git a/granulate_utils/metrics/yarn/utils.py b/granulate_utils/metrics/yarn/utils.py index b8cb2201..9b7a3286 100644 --- a/granulate_utils/metrics/yarn/utils.py +++ b/granulate_utils/metrics/yarn/utils.py @@ -152,21 +152,28 @@ def read_config_file( Read YARN config from file """ try: - result = {} with open(xml_file, "r") as f: logger.debug(f"reading {xml_file}") - root = ET.fromstring(f.read()) - for p in root.findall("./property"): - name = p.find("name") - value = p.find("value") - if name is not None and value is not None and (key := name.text): - result[key] = value.text or "" - return result + return parse_config_xml(f.read()) except FileNotFoundError: logger.error(f"file not found: {xml_file}") return None +def parse_config_xml(xml: str) -> Dict[str, str]: + """ + Parse YARN config from XML string + """ + root = ET.fromstring(xml) + result = {} + for p in root.findall("./property"): + name = p.find("name") + value = p.find("value") + if name is not None and value is not None and (key := name.text): + result[key] = value.text or "" + return result + + def get_all_rm_addresses( yarn_config: Dict[str, Any], *, logger: Union[logging.Logger, logging.LoggerAdapter] ) -> List[str]: