Skip to content

Commit

Permalink
Remove oppressive language references in sp_types
Browse files Browse the repository at this point in the history
Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa committed Jul 21, 2023
1 parent 5c242cf commit 8629a50
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def _init_nodes(self):
'region': self.region,
'instance_type': self.props.get_value('node_type_id')
}
executor = DatabricksNode.create_worker_node().set_fields_from_dict(executor_props)
executor = DatabricksNode.create_executor_node().set_fields_from_dict(executor_props)
executor.fetch_and_set_hw_info(self.cli)
executor_nodes.append(executor)
primary_props = {
Expand All @@ -193,7 +193,7 @@ def _init_nodes(self):
'region': self.region,
'instance_type': self.props.get_value('driver_node_type_id')
}
primary_node = DatabricksNode.create_master_node().set_fields_from_dict(primary_props)
primary_node = DatabricksNode.create_primary_node().set_fields_from_dict(primary_props)
primary_node.fetch_and_set_hw_info(self.cli)
self.nodes = {
SparkNodeType.WORKER: executor_nodes,
Expand Down Expand Up @@ -239,7 +239,7 @@ def _build_migrated_cluster(self, orig_cluster):
'region': anode.region,
'props': anode.props,
}
new_node = DatabricksNode.create_worker_node().set_fields_from_dict(executor_props)
new_node = DatabricksNode.create_executor_node().set_fields_from_dict(executor_props)
new_executor_nodes.append(new_node)
self.nodes = {
SparkNodeType.WORKER: new_executor_nodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def _init_nodes(self):
'region': self.region,
'instance_type': self.props.get_value('node_type_id')
}
executor = DatabricksAzureNode.create_worker_node().set_fields_from_dict(executor_props)
executor = DatabricksAzureNode.create_executor_node().set_fields_from_dict(executor_props)
executor.fetch_and_set_hw_info(self.cli)
executor_nodes.append(executor)
driver_props = {
Expand All @@ -274,7 +274,7 @@ def _init_nodes(self):
'region': self.region,
'instance_type': self.props.get_value('driver_node_type_id')
}
driver_node = DatabricksAzureNode.create_master_node().set_fields_from_dict(driver_props)
driver_node = DatabricksAzureNode.create_primary_node().set_fields_from_dict(driver_props)
driver_node.fetch_and_set_hw_info(self.cli)
self.nodes = {
SparkNodeType.WORKER: executor_nodes,
Expand Down Expand Up @@ -320,7 +320,7 @@ def _build_migrated_cluster(self, orig_cluster):
'region': anode.region,
'props': anode.props,
}
new_node = DatabricksAzureNode.create_worker_node().set_fields_from_dict(executor_props)
new_node = DatabricksAzureNode.create_executor_node().set_fields_from_dict(executor_props)
new_executor_nodes.append(new_node)
self.nodes = {
SparkNodeType.WORKER: new_executor_nodes,
Expand Down
14 changes: 7 additions & 7 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ def _init_nodes(self):
# set the node zone based on the wrapper defined zone
'zone': self.zone
}
executor = DataprocNode.create_worker_node().set_fields_from_dict(executor_props)
executor = DataprocNode.create_executor_node().set_fields_from_dict(executor_props)
# TODO for optimization, we should set HW props for 1 executor
executor.fetch_and_set_hw_info(self.cli)
executor_nodes.append(executor)
Expand All @@ -410,7 +410,7 @@ def _init_nodes(self):
# set the node zone based on the wrapper defined zone
'zone': self.zone
}
primary_node = DataprocNode.create_master_node().set_fields_from_dict(primary_props)
primary_node = DataprocNode.create_primary_node().set_fields_from_dict(primary_props)
primary_node.fetch_and_set_hw_info(self.cli)
self.nodes = {
SparkNodeType.WORKER: executor_nodes,
Expand Down Expand Up @@ -451,7 +451,7 @@ def _build_migrated_cluster(self, orig_cluster):
'name': anode.name,
'zone': anode.zone,
}
new_node = DataprocNode.create_worker_node().set_fields_from_dict(executor_props)
new_node = DataprocNode.create_executor_node().set_fields_from_dict(executor_props)
# we cannot rely on setting gpu info from the SDK because
# dataproc does not bind machine types to GPUs
# new_node.fetch_and_set_hw_info(self.cli)
Expand Down Expand Up @@ -483,8 +483,8 @@ def get_image_version(self) -> str:
return self.props.get_value_silent('config', 'softwareConfig', 'imageVersion')

def _set_render_args_create_template(self) -> dict:
executor_node = self.get_worker_node()
gpu_per_machine, gpu_device = self.get_gpu_per_worker()
executor_node = self.get_executor_node()
gpu_per_machine, gpu_device = self.get_gpu_per_executor()
# map the gpu device to the equivalent accepted argument
gpu_device_hash = {
'T4': 'nvidia-tesla-t4',
Expand All @@ -495,8 +495,8 @@ def _set_render_args_create_template(self) -> dict:
'REGION': self.region,
'ZONE': self.zone,
'IMAGE': self.get_image_version(),
'MASTER_MACHINE': self.get_master_node().instance_type,
'WORKERS_COUNT': self.get_workers_count(),
'MASTER_MACHINE': self.get_primary_node().instance_type,
'WORKERS_COUNT': self.get_executors_count(),
'WORKERS_MACHINE': executor_node.instance_type,
'LOCAL_SSD': 2,
'GPU_DEVICE': gpu_device_hash.get(gpu_device),
Expand Down
6 changes: 3 additions & 3 deletions user_tools/src/spark_rapids_pytools/cloud_api/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,13 @@ def get_image_version(self) -> str:
return self.props.get_value('ReleaseLabel')

def _set_render_args_create_template(self) -> dict:
worker_node = self.get_worker_node()
worker_node = self.get_executor_node()
return {
'CLUSTER_NAME': self.get_name(),
'ZONE': self.zone,
'IMAGE': self.get_image_version(),
'MASTER_MACHINE': self.get_master_node().instance_type,
'WORKERS_COUNT': self.get_workers_count(),
'MASTER_MACHINE': self.get_primary_node().instance_type,
'WORKERS_COUNT': self.get_executors_count(),
'WORKERS_MACHINE': worker_node.instance_type
}

Expand Down
6 changes: 3 additions & 3 deletions user_tools/src/spark_rapids_pytools/cloud_api/onprem.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def _init_nodes(self):
'zone': self.zone,
'platform_name': self.platform.get_platform_name()
}
executor = OnPremNode.create_worker_node().set_fields_from_dict(executor_props)
executor = OnPremNode.create_executor_node().set_fields_from_dict(executor_props)
# TODO for optimization, we should set HW props for 1 executor
executor.fetch_and_set_hw_info(self.cli)
executor_nodes.append(executor)
Expand All @@ -230,7 +230,7 @@ def _init_nodes(self):
'platform_name': self.platform.get_platform_name()
}

primary_node = OnPremNode.create_master_node().set_fields_from_dict(primary_props)
primary_node = OnPremNode.create_primary_node().set_fields_from_dict(primary_props)
primary_node.fetch_and_set_hw_info(self.cli)
self.nodes = {
SparkNodeType.WORKER: executor_nodes,
Expand All @@ -252,7 +252,7 @@ def _build_migrated_cluster(self, orig_cluster):
'name': anode.name,
'zone': anode.zone,
}
new_node = OnPremNode.create_worker_node().set_fields_from_dict(executor_props)
new_node = OnPremNode.create_executor_node().set_fields_from_dict(executor_props)
gpu_mc_hw: ClusterNode = supported_mc_map.get(new_instance_type)
new_node.construct_hw_info(cli=None,
gpu_info=gpu_mc_hw.gpu_info,
Expand Down
96 changes: 48 additions & 48 deletions user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class TargetPlatform(EnumeratedType):

class SparkNodeType(EnumeratedType):
"""
Node type from Spark perspective. We either have a master node or a worker node.
Node type from Spark perspective. We either have a primary node or a executor node.
Note that the provider could have different grouping.
For example EMR has: master, task, and core.
Another categorization: onDemand..etc.
Expand Down Expand Up @@ -247,20 +247,20 @@ def find_best_cpu_conversion(self, target_list: dict):
return best_match

@classmethod
def create_worker_node(cls) -> Any:
def create_executor_node(cls) -> Any:
return cls(SparkNodeType.WORKER)

@classmethod
def create_master_node(cls) -> Any:
def create_primary_node(cls) -> Any:
return cls(SparkNodeType.MASTER)

@classmethod
def create_node(cls, value):
if isinstance(value, SparkNodeType):
if value == SparkNodeType.MASTER:
return cls.create_master_node()
return cls.create_primary_node()
if value == SparkNodeType.WORKER:
return cls.create_worker_node()
return cls.create_executor_node()
raise RuntimeError(f'Invalid node type while creating cluster node {value}')


Expand Down Expand Up @@ -303,19 +303,19 @@ def get_node_instance_type(self, node_type: SparkNodeType) -> str:
node = self.get_node(node_type)
return node.instance_type

def get_workers_instant_types(self) -> str:
def get_executors_instant_types(self) -> str:
return self.get_node_instance_type(SparkNodeType.WORKER)

def get_workers_count(self) -> int:
def get_executors_count(self) -> int:
return self.get_nodes_cnt(SparkNodeType.WORKER)

def get_workers_cores_count(self) -> int:
def get_executors_cores_count(self) -> int:
return self.get_node_core_count(SparkNodeType.WORKER)

def get_workers_mem_mb(self) -> int:
def get_executors_mem_mb(self) -> int:
return self.get_node_mem_mb(SparkNodeType.WORKER)

def get_gpu_per_worker(self) -> (int, str):
def get_gpu_per_executor(self) -> (int, str):
return self.get_gpu_per_node(SparkNodeType.WORKER)


Expand Down Expand Up @@ -881,7 +881,7 @@ def get_platform_name(self) -> str:
return CloudPlatform.pretty_print(self.type_id)

def get_footer_message(self) -> str:
return 'To support acceleration with T4 GPUs, switch the worker node instance types'
return 'To support acceleration with T4 GPUs, switch the executor node instance types'


@dataclass
Expand All @@ -902,15 +902,15 @@ class ClusterBase(ClusterGetAccessor):
logger: Logger = field(default=ToolLogging.get_and_setup_logger('rapids.tools.cluster'), init=False)

@staticmethod
def _verify_workers_exist(has_no_workers_cb: Callable[[], bool]):
def _verify_executors_exist(has_no_executors_cb: Callable[[], bool]):
"""
Specifies how to handle cluster definitions that have no workers
:param has_no_workers_cb: A callback that returns True if the cluster does not have any
workers
Specifies how to handle cluster definitions that have no executors
:param has_no_executors_cb: A callback that returns True if the cluster does not have any
executors
"""
if has_no_workers_cb():
raise RuntimeError('Invalid cluster: The cluster has no worker nodes.\n\t'
'It is recommended to define a with (1 master, N workers).')
if has_no_executors_cb():
raise RuntimeError('Invalid cluster: The cluster has no executor nodes.\n\t'
'It is recommended to define a with (1 primary, N executors).')

def __post_init__(self):
self.cli = self.platform.cli
Expand Down Expand Up @@ -970,8 +970,8 @@ def set_connection(self,
pre_init_args = self._init_connection(cluster_id, props)
self.set_fields_from_dict(pre_init_args)
self._init_nodes()
# Verify that the cluster has defined workers
self._verify_workers_exist(lambda: not self.nodes.get(SparkNodeType.WORKER))
# Verify that the cluster has defined executors
self._verify_executors_exist(lambda: not self.nodes.get(SparkNodeType.WORKER))
return self

def is_cluster_running(self) -> bool:
Expand All @@ -993,22 +993,22 @@ def run_cmd_driver(self, ssh_cmd: str, cmd_input: str = None) -> str or None:
i.e., writing to a file.
:return:
"""
# get the master node
master_node: ClusterNode = self.get_master_node()
return self.cli.ssh_cmd_node(master_node, ssh_cmd, cmd_input=cmd_input)
# get the primary node
primary_node: ClusterNode = self.get_primary_node()
return self.cli.ssh_cmd_node(primary_node, ssh_cmd, cmd_input=cmd_input)

def run_cmd_worker(self, ssh_cmd: str, cmd_input: str = None, ind: int = 0) -> str or None:
def run_cmd_executor(self, ssh_cmd: str, cmd_input: str = None, ind: int = 0) -> str or None:
"""
Execute command on the worker node
Execute command on the executor node
:param ssh_cmd: the command to be executed on the remote node. Note that the quotes
surrounding the shell command should be included
:param cmd_input: optional argument string used as an input to the command line.
i.e., writing to a file
:param ind: the node index. By default, the command is executed on first worker node.
:param ind: the node index. By default, the command is executed on first executor node.
"""
# get the worker node
worker_node: ClusterNode = self.get_worker_node(ind)
return self.cli.ssh_cmd_node(worker_node, ssh_cmd, cmd_input=cmd_input)
# get the executor node
executor_node: ClusterNode = self.get_executor_node(ind)
return self.cli.ssh_cmd_node(executor_node, ssh_cmd, cmd_input=cmd_input)

def run_cmd_node(self, node: ClusterNode, ssh_cmd: str, cmd_input: str = None) -> str or None:
"""
Expand Down Expand Up @@ -1042,9 +1042,9 @@ def scp_from_node(self, node: ClusterNode, src: str, dest: str) -> str or None:
def get_region(self) -> str:
return self.cli.get_region()

def get_worker_hw_info(self) -> NodeHWInfo:
worker_node = self.get_worker_node()
return worker_node.hw_info
def get_executor_hw_info(self) -> NodeHWInfo:
executor_node = self.get_executor_node()
return executor_node.hw_info

def _build_migrated_cluster(self, orig_cluster):
"""
Expand Down Expand Up @@ -1114,10 +1114,10 @@ def get_all_nodes(self) -> list:

return nodes

def get_master_node(self) -> ClusterNode:
def get_primary_node(self) -> ClusterNode:
return self.nodes.get(SparkNodeType.MASTER)

def get_worker_node(self, ind: int = 0) -> ClusterNode:
def get_executor_node(self, ind: int = 0) -> ClusterNode:
return self.nodes.get(SparkNodeType.WORKER)[ind]

def get_name(self) -> str:
Expand Down Expand Up @@ -1159,54 +1159,54 @@ class ClusterReshape(ClusterGetAccessor):
The caller can override the behavior by passing a callback method.
The caller also can control which node type is affected by the reshap-methods.
This can be done by setting the "node_types". By default, the reshaping
is limited to the worker nodes of a cluster.
is limited to the executor nodes of a cluster.
"""

cluster_inst: ClusterBase
node_types: List[SparkNodeType] = field(default_factory=lambda: [SparkNodeType.WORKER])
reshape_workers_mc_type: Callable[[str], str] = field(default_factory=lambda: lambda x: x)
reshape_workers_cnt: Callable[[int], int] = field(default_factory=lambda: lambda x: x)
reshape_workers_cpus: Callable[[int], int] = field(default_factory=lambda: lambda x: x)
reshape_workers_mem: Callable[[int], int] = field(default_factory=lambda: lambda x: x)
reshape_workers_gpu_cnt: Callable[[int], int] = field(default_factory=lambda: lambda x: x)
reshape_workers_gpu_device: Callable[[str], str] = field(default_factory=lambda: lambda x: x)
reshape_executors_mc_type: Callable[[str], str] = field(default_factory=lambda: lambda x: x)
reshape_executors_cnt: Callable[[int], int] = field(default_factory=lambda: lambda x: x)
reshape_executors_cpus: Callable[[int], int] = field(default_factory=lambda: lambda x: x)
reshape_executors_mem: Callable[[int], int] = field(default_factory=lambda: lambda x: x)
reshape_executors_gpu_cnt: Callable[[int], int] = field(default_factory=lambda: lambda x: x)
reshape_executors_gpu_device: Callable[[str], str] = field(default_factory=lambda: lambda x: x)

def get_node(self, node_type: SparkNodeType) -> ClusterNode:
if node_type == SparkNodeType.WORKER:
return self.cluster_inst.get_worker_node()
return self.cluster_inst.get_master_node()
return self.cluster_inst.get_executor_node()
return self.cluster_inst.get_primary_node()

def get_all_nodes(self) -> list:
raise NotImplementedError

def get_node_instance_type(self, node_type: SparkNodeType) -> str:
res = super().get_node_instance_type(node_type)
if node_type in self.node_types:
return self.reshape_workers_mc_type(res)
return self.reshape_executors_mc_type(res)
return res

def get_nodes_cnt(self, node_type: SparkNodeType) -> int:
res = self.cluster_inst.get_nodes_cnt(node_type)
if node_type in self.node_types:
return self.reshape_workers_cnt(res)
return self.reshape_executors_cnt(res)
return res

def get_node_core_count(self, node_type: SparkNodeType) -> int:
res = super().get_node_core_count(node_type)
if node_type in self.node_types:
return self.reshape_workers_cpus(res)
return self.reshape_executors_cpus(res)
return res

def get_node_mem_mb(self, node_type: SparkNodeType) -> int:
res = super().get_node_mem_mb(node_type)
if node_type in self.node_types:
return self.reshape_workers_mem(res)
return self.reshape_executors_mem(res)
return res

def get_gpu_per_node(self, node_type: SparkNodeType) -> (int, str):
num_gpus, gpu_device = super().get_gpu_per_node(node_type)
if node_type in self.node_types:
return self.reshape_workers_gpu_cnt(num_gpus), self.reshape_workers_gpu_device(gpu_device)
return self.reshape_executors_gpu_cnt(num_gpus), self.reshape_executors_gpu_device(gpu_device)
return num_gpus, gpu_device

def get_name(self) -> str:
Expand Down
2 changes: 1 addition & 1 deletion user_tools/src/spark_rapids_pytools/rapids/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def _run_rapids_tool(self):
"""
self.logger.info('Executing Bootstrap commands on remote cluster to calculate default configurations.')
exec_cluster: ClusterBase = self.get_exec_cluster()
worker_hw_info = exec_cluster.get_worker_hw_info()
worker_hw_info = exec_cluster.get_executor_hw_info()
self.logger.debug('Worker hardware INFO %s', worker_hw_info)
try:
spark_settings = self.__calculate_spark_settings(worker_info=worker_hw_info)
Expand Down
4 changes: 2 additions & 2 deletions user_tools/src/spark_rapids_pytools/rapids/profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ def _generate_autotuner_file_for_cluster(self, file_path: str, cluster_ob: Clust
:return:
"""
self.logger.info('Generating input file for Auto-tuner')
worker_hw_info = cluster_ob.get_worker_hw_info()
worker_hw_info = cluster_ob.get_executor_hw_info()
worker_info = {
'system': {
'numCores': worker_hw_info.sys_info.num_cpus,
'memory': f'{worker_hw_info.sys_info.cpu_mem}MiB',
'numWorkers': cluster_ob.get_workers_count()
'numWorkers': cluster_ob.get_executors_count()
},
'gpu': {
# the scala code expects a unit
Expand Down
Loading

0 comments on commit 8629a50

Please sign in to comment.