diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py index fb6fd55eb..d221fa661 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py @@ -183,7 +183,7 @@ def _init_nodes(self): 'region': self.region, 'instance_type': self.props.get_value('node_type_id') } - executor = DatabricksNode.create_worker_node().set_fields_from_dict(executor_props) + executor = DatabricksNode.create_executor_node().set_fields_from_dict(executor_props) executor.fetch_and_set_hw_info(self.cli) executor_nodes.append(executor) primary_props = { @@ -193,7 +193,7 @@ def _init_nodes(self): 'region': self.region, 'instance_type': self.props.get_value('driver_node_type_id') } - primary_node = DatabricksNode.create_master_node().set_fields_from_dict(primary_props) + primary_node = DatabricksNode.create_primary_node().set_fields_from_dict(primary_props) primary_node.fetch_and_set_hw_info(self.cli) self.nodes = { SparkNodeType.WORKER: executor_nodes, @@ -239,7 +239,7 @@ def _build_migrated_cluster(self, orig_cluster): 'region': anode.region, 'props': anode.props, } - new_node = DatabricksNode.create_worker_node().set_fields_from_dict(executor_props) + new_node = DatabricksNode.create_executor_node().set_fields_from_dict(executor_props) new_executor_nodes.append(new_node) self.nodes = { SparkNodeType.WORKER: new_executor_nodes, diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py index 0589f0241..d2261e175 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py @@ -264,7 +264,7 @@ def _init_nodes(self): 'region': self.region, 'instance_type': self.props.get_value('node_type_id') } - executor = DatabricksAzureNode.create_worker_node().set_fields_from_dict(executor_props) + executor = DatabricksAzureNode.create_executor_node().set_fields_from_dict(executor_props) executor.fetch_and_set_hw_info(self.cli) executor_nodes.append(executor) driver_props = { @@ -274,7 +274,7 @@ def _init_nodes(self): 'region': self.region, 'instance_type': self.props.get_value('driver_node_type_id') } - driver_node = DatabricksAzureNode.create_master_node().set_fields_from_dict(driver_props) + driver_node = DatabricksAzureNode.create_primary_node().set_fields_from_dict(driver_props) driver_node.fetch_and_set_hw_info(self.cli) self.nodes = { SparkNodeType.WORKER: executor_nodes, @@ -320,7 +320,7 @@ def _build_migrated_cluster(self, orig_cluster): 'region': anode.region, 'props': anode.props, } - new_node = DatabricksAzureNode.create_worker_node().set_fields_from_dict(executor_props) + new_node = DatabricksAzureNode.create_executor_node().set_fields_from_dict(executor_props) new_executor_nodes.append(new_node) self.nodes = { SparkNodeType.WORKER: new_executor_nodes, diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py index e6f0501c2..37a9c9a36 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py @@ -399,7 +399,7 @@ def _init_nodes(self): # set the node zone based on the wrapper defined zone 'zone': self.zone } - executor = DataprocNode.create_worker_node().set_fields_from_dict(executor_props) + executor = DataprocNode.create_executor_node().set_fields_from_dict(executor_props) # TODO for optimization, we should set HW props for 1 executor executor.fetch_and_set_hw_info(self.cli) executor_nodes.append(executor) @@ -410,7 +410,7 @@ def _init_nodes(self): # set the node zone based on the wrapper defined zone 'zone': self.zone } - primary_node = DataprocNode.create_master_node().set_fields_from_dict(primary_props) + primary_node = DataprocNode.create_primary_node().set_fields_from_dict(primary_props) primary_node.fetch_and_set_hw_info(self.cli) self.nodes = { SparkNodeType.WORKER: executor_nodes, @@ -451,7 +451,7 @@ def _build_migrated_cluster(self, orig_cluster): 'name': anode.name, 'zone': anode.zone, } - new_node = DataprocNode.create_worker_node().set_fields_from_dict(executor_props) + new_node = DataprocNode.create_executor_node().set_fields_from_dict(executor_props) # we cannot rely on setting gpu info from the SDK because # dataproc does not bind machine types to GPUs # new_node.fetch_and_set_hw_info(self.cli) @@ -483,8 +483,8 @@ def get_image_version(self) -> str: return self.props.get_value_silent('config', 'softwareConfig', 'imageVersion') def _set_render_args_create_template(self) -> dict: - executor_node = self.get_worker_node() - gpu_per_machine, gpu_device = self.get_gpu_per_worker() + executor_node = self.get_executor_node() + gpu_per_machine, gpu_device = self.get_gpu_per_executor() # map the gpu device to the equivalent accepted argument gpu_device_hash = { 'T4': 'nvidia-tesla-t4', @@ -495,8 +495,8 @@ def _set_render_args_create_template(self) -> dict: 'REGION': self.region, 'ZONE': self.zone, 'IMAGE': self.get_image_version(), - 'MASTER_MACHINE': self.get_master_node().instance_type, - 'WORKERS_COUNT': self.get_workers_count(), + 'MASTER_MACHINE': self.get_primary_node().instance_type, + 'WORKERS_COUNT': self.get_executors_count(), 'WORKERS_MACHINE': executor_node.instance_type, 'LOCAL_SSD': 2, 'GPU_DEVICE': gpu_device_hash.get(gpu_device), diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/emr.py b/user_tools/src/spark_rapids_pytools/cloud_api/emr.py index 61e0282d2..a65ce5456 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/emr.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/emr.py @@ -451,13 +451,13 @@ def get_image_version(self) -> str: return self.props.get_value('ReleaseLabel') def _set_render_args_create_template(self) -> dict: - worker_node = self.get_worker_node() + worker_node = self.get_executor_node() return { 'CLUSTER_NAME': self.get_name(), 'ZONE': self.zone, 'IMAGE': self.get_image_version(), - 'MASTER_MACHINE': self.get_master_node().instance_type, - 'WORKERS_COUNT': self.get_workers_count(), + 'MASTER_MACHINE': self.get_primary_node().instance_type, + 'WORKERS_COUNT': self.get_executors_count(), 'WORKERS_MACHINE': worker_node.instance_type } diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py b/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py index dc6ee532a..deadd5e3d 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py @@ -217,7 +217,7 @@ def _init_nodes(self): 'zone': self.zone, 'platform_name': self.platform.get_platform_name() } - executor = OnPremNode.create_worker_node().set_fields_from_dict(executor_props) + executor = OnPremNode.create_executor_node().set_fields_from_dict(executor_props) # TODO for optimization, we should set HW props for 1 executor executor.fetch_and_set_hw_info(self.cli) executor_nodes.append(executor) @@ -230,7 +230,7 @@ def _init_nodes(self): 'platform_name': self.platform.get_platform_name() } - primary_node = OnPremNode.create_master_node().set_fields_from_dict(primary_props) + primary_node = OnPremNode.create_primary_node().set_fields_from_dict(primary_props) primary_node.fetch_and_set_hw_info(self.cli) self.nodes = { SparkNodeType.WORKER: executor_nodes, @@ -252,7 +252,7 @@ def _build_migrated_cluster(self, orig_cluster): 'name': anode.name, 'zone': anode.zone, } - new_node = OnPremNode.create_worker_node().set_fields_from_dict(executor_props) + new_node = OnPremNode.create_executor_node().set_fields_from_dict(executor_props) gpu_mc_hw: ClusterNode = supported_mc_map.get(new_instance_type) new_node.construct_hw_info(cli=None, gpu_info=gpu_mc_hw.gpu_info, diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py index e378b3c1c..c6d02c5d8 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py @@ -138,7 +138,7 @@ class TargetPlatform(EnumeratedType): class SparkNodeType(EnumeratedType): """ - Node type from Spark perspective. We either have a master node or a worker node. + Node type from Spark perspective. We either have a primary node or a executor node. Note that the provider could have different grouping. For example EMR has: master, task, and core. Another categorization: onDemand..etc. @@ -247,20 +247,20 @@ def find_best_cpu_conversion(self, target_list: dict): return best_match @classmethod - def create_worker_node(cls) -> Any: + def create_executor_node(cls) -> Any: return cls(SparkNodeType.WORKER) @classmethod - def create_master_node(cls) -> Any: + def create_primary_node(cls) -> Any: return cls(SparkNodeType.MASTER) @classmethod def create_node(cls, value): if isinstance(value, SparkNodeType): if value == SparkNodeType.MASTER: - return cls.create_master_node() + return cls.create_primary_node() if value == SparkNodeType.WORKER: - return cls.create_worker_node() + return cls.create_executor_node() raise RuntimeError(f'Invalid node type while creating cluster node {value}') @@ -303,19 +303,19 @@ def get_node_instance_type(self, node_type: SparkNodeType) -> str: node = self.get_node(node_type) return node.instance_type - def get_workers_instant_types(self) -> str: + def get_executors_instant_types(self) -> str: return self.get_node_instance_type(SparkNodeType.WORKER) - def get_workers_count(self) -> int: + def get_executors_count(self) -> int: return self.get_nodes_cnt(SparkNodeType.WORKER) - def get_workers_cores_count(self) -> int: + def get_executors_cores_count(self) -> int: return self.get_node_core_count(SparkNodeType.WORKER) - def get_workers_mem_mb(self) -> int: + def get_executors_mem_mb(self) -> int: return self.get_node_mem_mb(SparkNodeType.WORKER) - def get_gpu_per_worker(self) -> (int, str): + def get_gpu_per_executor(self) -> (int, str): return self.get_gpu_per_node(SparkNodeType.WORKER) @@ -881,7 +881,7 @@ def get_platform_name(self) -> str: return CloudPlatform.pretty_print(self.type_id) def get_footer_message(self) -> str: - return 'To support acceleration with T4 GPUs, switch the worker node instance types' + return 'To support acceleration with T4 GPUs, switch the executor node instance types' @dataclass @@ -902,15 +902,15 @@ class ClusterBase(ClusterGetAccessor): logger: Logger = field(default=ToolLogging.get_and_setup_logger('rapids.tools.cluster'), init=False) @staticmethod - def _verify_workers_exist(has_no_workers_cb: Callable[[], bool]): + def _verify_executors_exist(has_no_executors_cb: Callable[[], bool]): """ - Specifies how to handle cluster definitions that have no workers - :param has_no_workers_cb: A callback that returns True if the cluster does not have any - workers + Specifies how to handle cluster definitions that have no executors + :param has_no_executors_cb: A callback that returns True if the cluster does not have any + executors """ - if has_no_workers_cb(): - raise RuntimeError('Invalid cluster: The cluster has no worker nodes.\n\t' - 'It is recommended to define a with (1 master, N workers).') + if has_no_executors_cb(): + raise RuntimeError('Invalid cluster: The cluster has no executor nodes.\n\t' + 'It is recommended to define a with (1 primary, N executors).') def __post_init__(self): self.cli = self.platform.cli @@ -970,8 +970,8 @@ def set_connection(self, pre_init_args = self._init_connection(cluster_id, props) self.set_fields_from_dict(pre_init_args) self._init_nodes() - # Verify that the cluster has defined workers - self._verify_workers_exist(lambda: not self.nodes.get(SparkNodeType.WORKER)) + # Verify that the cluster has defined executors + self._verify_executors_exist(lambda: not self.nodes.get(SparkNodeType.WORKER)) return self def is_cluster_running(self) -> bool: @@ -993,22 +993,22 @@ def run_cmd_driver(self, ssh_cmd: str, cmd_input: str = None) -> str or None: i.e., writing to a file. :return: """ - # get the master node - master_node: ClusterNode = self.get_master_node() - return self.cli.ssh_cmd_node(master_node, ssh_cmd, cmd_input=cmd_input) + # get the primary node + primary_node: ClusterNode = self.get_primary_node() + return self.cli.ssh_cmd_node(primary_node, ssh_cmd, cmd_input=cmd_input) - def run_cmd_worker(self, ssh_cmd: str, cmd_input: str = None, ind: int = 0) -> str or None: + def run_cmd_executor(self, ssh_cmd: str, cmd_input: str = None, ind: int = 0) -> str or None: """ - Execute command on the worker node + Execute command on the executor node :param ssh_cmd: the command to be executed on the remote node. Note that the quotes surrounding the shell command should be included :param cmd_input: optional argument string used as an input to the command line. i.e., writing to a file - :param ind: the node index. By default, the command is executed on first worker node. + :param ind: the node index. By default, the command is executed on first executor node. """ - # get the worker node - worker_node: ClusterNode = self.get_worker_node(ind) - return self.cli.ssh_cmd_node(worker_node, ssh_cmd, cmd_input=cmd_input) + # get the executor node + executor_node: ClusterNode = self.get_executor_node(ind) + return self.cli.ssh_cmd_node(executor_node, ssh_cmd, cmd_input=cmd_input) def run_cmd_node(self, node: ClusterNode, ssh_cmd: str, cmd_input: str = None) -> str or None: """ @@ -1042,9 +1042,9 @@ def scp_from_node(self, node: ClusterNode, src: str, dest: str) -> str or None: def get_region(self) -> str: return self.cli.get_region() - def get_worker_hw_info(self) -> NodeHWInfo: - worker_node = self.get_worker_node() - return worker_node.hw_info + def get_executor_hw_info(self) -> NodeHWInfo: + executor_node = self.get_executor_node() + return executor_node.hw_info def _build_migrated_cluster(self, orig_cluster): """ @@ -1114,10 +1114,10 @@ def get_all_nodes(self) -> list: return nodes - def get_master_node(self) -> ClusterNode: + def get_primary_node(self) -> ClusterNode: return self.nodes.get(SparkNodeType.MASTER) - def get_worker_node(self, ind: int = 0) -> ClusterNode: + def get_executor_node(self, ind: int = 0) -> ClusterNode: return self.nodes.get(SparkNodeType.WORKER)[ind] def get_name(self) -> str: @@ -1159,22 +1159,22 @@ class ClusterReshape(ClusterGetAccessor): The caller can override the behavior by passing a callback method. The caller also can control which node type is affected by the reshap-methods. This can be done by setting the "node_types". By default, the reshaping - is limited to the worker nodes of a cluster. + is limited to the executor nodes of a cluster. """ cluster_inst: ClusterBase node_types: List[SparkNodeType] = field(default_factory=lambda: [SparkNodeType.WORKER]) - reshape_workers_mc_type: Callable[[str], str] = field(default_factory=lambda: lambda x: x) - reshape_workers_cnt: Callable[[int], int] = field(default_factory=lambda: lambda x: x) - reshape_workers_cpus: Callable[[int], int] = field(default_factory=lambda: lambda x: x) - reshape_workers_mem: Callable[[int], int] = field(default_factory=lambda: lambda x: x) - reshape_workers_gpu_cnt: Callable[[int], int] = field(default_factory=lambda: lambda x: x) - reshape_workers_gpu_device: Callable[[str], str] = field(default_factory=lambda: lambda x: x) + reshape_executors_mc_type: Callable[[str], str] = field(default_factory=lambda: lambda x: x) + reshape_executors_cnt: Callable[[int], int] = field(default_factory=lambda: lambda x: x) + reshape_executors_cpus: Callable[[int], int] = field(default_factory=lambda: lambda x: x) + reshape_executors_mem: Callable[[int], int] = field(default_factory=lambda: lambda x: x) + reshape_executors_gpu_cnt: Callable[[int], int] = field(default_factory=lambda: lambda x: x) + reshape_executors_gpu_device: Callable[[str], str] = field(default_factory=lambda: lambda x: x) def get_node(self, node_type: SparkNodeType) -> ClusterNode: if node_type == SparkNodeType.WORKER: - return self.cluster_inst.get_worker_node() - return self.cluster_inst.get_master_node() + return self.cluster_inst.get_executor_node() + return self.cluster_inst.get_primary_node() def get_all_nodes(self) -> list: raise NotImplementedError @@ -1182,31 +1182,31 @@ def get_all_nodes(self) -> list: def get_node_instance_type(self, node_type: SparkNodeType) -> str: res = super().get_node_instance_type(node_type) if node_type in self.node_types: - return self.reshape_workers_mc_type(res) + return self.reshape_executors_mc_type(res) return res def get_nodes_cnt(self, node_type: SparkNodeType) -> int: res = self.cluster_inst.get_nodes_cnt(node_type) if node_type in self.node_types: - return self.reshape_workers_cnt(res) + return self.reshape_executors_cnt(res) return res def get_node_core_count(self, node_type: SparkNodeType) -> int: res = super().get_node_core_count(node_type) if node_type in self.node_types: - return self.reshape_workers_cpus(res) + return self.reshape_executors_cpus(res) return res def get_node_mem_mb(self, node_type: SparkNodeType) -> int: res = super().get_node_mem_mb(node_type) if node_type in self.node_types: - return self.reshape_workers_mem(res) + return self.reshape_executors_mem(res) return res def get_gpu_per_node(self, node_type: SparkNodeType) -> (int, str): num_gpus, gpu_device = super().get_gpu_per_node(node_type) if node_type in self.node_types: - return self.reshape_workers_gpu_cnt(num_gpus), self.reshape_workers_gpu_device(gpu_device) + return self.reshape_executors_gpu_cnt(num_gpus), self.reshape_executors_gpu_device(gpu_device) return num_gpus, gpu_device def get_name(self) -> str: diff --git a/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py b/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py index 2dc8f22fa..70f4c3462 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py +++ b/user_tools/src/spark_rapids_pytools/rapids/bootstrap.py @@ -88,7 +88,7 @@ def _run_rapids_tool(self): """ self.logger.info('Executing Bootstrap commands on remote cluster to calculate default configurations.') exec_cluster: ClusterBase = self.get_exec_cluster() - worker_hw_info = exec_cluster.get_worker_hw_info() + worker_hw_info = exec_cluster.get_executor_hw_info() self.logger.debug('Worker hardware INFO %s', worker_hw_info) try: spark_settings = self.__calculate_spark_settings(worker_info=worker_hw_info) diff --git a/user_tools/src/spark_rapids_pytools/rapids/profiling.py b/user_tools/src/spark_rapids_pytools/rapids/profiling.py index 97e75a33c..b3da740c2 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/profiling.py +++ b/user_tools/src/spark_rapids_pytools/rapids/profiling.py @@ -97,12 +97,12 @@ def _generate_autotuner_file_for_cluster(self, file_path: str, cluster_ob: Clust :return: """ self.logger.info('Generating input file for Auto-tuner') - worker_hw_info = cluster_ob.get_worker_hw_info() + worker_hw_info = cluster_ob.get_executor_hw_info() worker_info = { 'system': { 'numCores': worker_hw_info.sys_info.num_cpus, 'memory': f'{worker_hw_info.sys_info.cpu_mem}MiB', - 'numWorkers': cluster_ob.get_workers_count() + 'numWorkers': cluster_ob.get_executors_count() }, 'gpu': { # the scala code expects a unit diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index f9a305743..e475a0571 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -490,7 +490,7 @@ def __apply_gpu_cluster_reshape(self, all_apps: pd.DataFrame) -> (pd.DataFrame, per_row_flag = False if self.__recommendation_is_non_standard(): apps_df, per_row_flag = self.__apply_non_standard_gpu_shape(all_apps, - gpu_cluster.get_workers_count(), + gpu_cluster.get_executors_count(), gpu_reshape_type) else: apps_df = all_apps @@ -534,7 +534,7 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series: if not estimator_obj: # create the object and add it to the caching dict reshaped_cluster = ClusterReshape(self.ctxt.get_ctxt('gpuClusterProxy'), - reshape_workers_cnt=lambda x: workers_cnt) + reshape_executors_cnt=lambda x: workers_cnt) estimator_obj = self.ctxt.platform.create_saving_estimator(self.ctxt.get_ctxt('cpuClusterProxy'), reshaped_cluster) saving_estimator_cache.setdefault(workers_cnt, estimator_obj) @@ -710,7 +710,7 @@ def _generate_section_lines(self, sec_conf: dict) -> List[str]: if sec_conf.get('sectionID') == 'initializationScript': # format the initialization scripts reshaped_gpu_cluster = ClusterReshape(self.ctxt.get_ctxt('gpuClusterProxy')) - gpu_per_machine, gpu_device = reshaped_gpu_cluster.get_gpu_per_worker() + gpu_per_machine, gpu_device = reshaped_gpu_cluster.get_gpu_per_executor() fill_map = { 0: self.ctxt.platform.cli.get_region(), 1: [gpu_device.lower(), gpu_per_machine]