Skip to content

Commit

Permalink
[Core] Add SKYPILOT_NUM_NODES env var (#3656)
Browse files Browse the repository at this point in the history
* Add SKYPILOT_NUM_NODES env var

* Update docs/source/running-jobs/environment-variables.rst

Co-authored-by: Zongheng Yang <[email protected]>

* Update docs/source/running-jobs/environment-variables.rst

Co-authored-by: Zongheng Yang <[email protected]>

* Update docs/source/running-jobs/environment-variables.rst

Co-authored-by: Zongheng Yang <[email protected]>

* format

* add remove version

* add smoke test for num nodes

* fix test

---------

Co-authored-by: Zongheng Yang <[email protected]>
  • Loading branch information
Michaelvll and concretevitamin committed Aug 23, 2024
1 parent 3cd768d commit 61d6894
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 23 deletions.
9 changes: 8 additions & 1 deletion docs/source/running-jobs/environment-variables.rst
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,12 @@ Environment variables for ``setup``
- Rank (an integer ID from 0 to :code:`num_nodes-1`) of the node being set up.
- 0
* - ``SKYPILOT_SETUP_NODE_IPS``
- A string of IP addresses of the nodes in the cluster with the same order as the node ranks, where each line contains one IP address.
- A string of IP addresses of the nodes in the cluster with the same order as the node ranks, where each line contains one IP address. Note that this is not necessarily the same as the nodes in ``run`` stage, as the ``setup`` stage runs on all nodes of the cluster, while the ``run`` stage can run on a subset of nodes.
- 1.2.3.4
3.4.5.6
* - ``SKYPILOT_NUM_NODES``
- Number of nodes in the cluster. Same value as ``$(echo "$SKYPILOT_NODE_IPS" | wc -l)``.
- 2
* - ``SKYPILOT_TASK_ID``
- A unique ID assigned to each task.

Expand Down Expand Up @@ -159,6 +163,9 @@ Environment variables for ``run``
* - ``SKYPILOT_NODE_IPS``
- A string of IP addresses of the nodes reserved to execute the task, where each line contains one IP address. Read more :ref:`here <dist-jobs>`.
- 1.2.3.4
* - ``SKYPILOT_NUM_NODES``
- Number of nodes assigned to execute the current task. Same value as ``$(echo "$SKYPILOT_NODE_IPS" | wc -l)``. Read more :ref:`here <dist-jobs>`.
- 1
* - ``SKYPILOT_NUM_GPUS_PER_NODE``
- Number of GPUs reserved on each node to execute the task; the same as the
count in ``accelerators: <name>:<count>`` (rounded up if a fraction). Read
Expand Down
44 changes: 27 additions & 17 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,9 @@ def add_prologue(self, job_id: int) -> None:
SKY_REMOTE_WORKDIR = {constants.SKY_REMOTE_WORKDIR!r}
kwargs = dict()
# Only set the `_temp_dir` to SkyPilot's ray cluster directory when the directory
# exists for backward compatibility for the VM launched before #1790.
# Only set the `_temp_dir` to SkyPilot's ray cluster directory when
# the directory exists for backward compatibility for the VM
# launched before #1790.
if os.path.exists({constants.SKY_REMOTE_RAY_TEMPDIR!r}):
kwargs['_temp_dir'] = {constants.SKY_REMOTE_RAY_TEMPDIR!r}
ray.init(
Expand Down Expand Up @@ -308,8 +309,9 @@ def get_or_fail(futures, pg) -> List[int]:
ready, unready = ray.wait(unready)
idx = futures.index(ready[0])
returncodes[idx] = ray.get(ready[0])
# Remove the placement group after all tasks are done, so that the
# next job can be scheduled on the released resources immediately.
# Remove the placement group after all tasks are done, so that
# the next job can be scheduled on the released resources
# immediately.
ray_util.remove_placement_group(pg)
sys.stdout.flush()
return returncodes
Expand Down Expand Up @@ -348,9 +350,9 @@ def add_gang_scheduling_placement_group_and_setup(
num_nodes: int,
resources_dict: Dict[str, float],
stable_cluster_internal_ips: List[str],
env_vars: Dict[str, str],
setup_cmd: Optional[str] = None,
setup_log_path: Optional[str] = None,
env_vars: Optional[Dict[str, str]] = None,
) -> None:
"""Create the gang scheduling placement group for a Task.
Expand Down Expand Up @@ -410,6 +412,8 @@ def add_gang_scheduling_placement_group_and_setup(

job_id = self.job_id
if setup_cmd is not None:
setup_envs = env_vars.copy()
setup_envs[constants.SKYPILOT_NUM_NODES] = str(num_nodes)
self._code += [
textwrap.dedent(f"""\
setup_cmd = {setup_cmd!r}
Expand Down Expand Up @@ -439,7 +443,7 @@ def add_gang_scheduling_placement_group_and_setup(
.remote(
setup_cmd,
os.path.expanduser({setup_log_path!r}),
env_vars={env_vars!r},
env_vars={setup_envs!r},
stream_logs=True,
with_ray=True,
) for i in range(total_num_nodes)]
Expand Down Expand Up @@ -550,11 +554,13 @@ def add_ray_task(self,
f'placement_group_bundle_index={gang_scheduling_id})')

sky_env_vars_dict_str = [
textwrap.dedent("""\
sky_env_vars_dict = {}
sky_env_vars_dict['SKYPILOT_NODE_IPS'] = job_ip_list_str
# Environment starting with `SKY_` is deprecated.
textwrap.dedent(f"""\
sky_env_vars_dict = {{}}
sky_env_vars_dict['{constants.SKYPILOT_NODE_IPS}'] = job_ip_list_str
# Backward compatibility: Environment starting with `SKY_` is
# deprecated. Remove it in v0.9.0.
sky_env_vars_dict['SKY_NODE_IPS'] = job_ip_list_str
sky_env_vars_dict['{constants.SKYPILOT_NUM_NODES}'] = len(job_ip_rank_list)
""")
]

Expand All @@ -575,8 +581,9 @@ def add_ray_task(self,
if script is not None:
sky_env_vars_dict['SKYPILOT_NUM_GPUS_PER_NODE'] = {int(math.ceil(num_gpus))!r}
# Environment starting with `SKY_` is deprecated.
sky_env_vars_dict['{constants.SKYPILOT_NUM_GPUS_PER_NODE}'] = {int(math.ceil(num_gpus))!r}
# Backward compatibility: Environment starting with `SKY_` is
# deprecated. Remove it in v0.9.0.
sky_env_vars_dict['SKY_NUM_GPUS_PER_NODE'] = {int(math.ceil(num_gpus))!r}
ip = gang_scheduling_id_to_ip[{gang_scheduling_id!r}]
Expand All @@ -593,12 +600,14 @@ def add_ray_task(self,
node_name = f'worker{{idx_in_cluster}}'
name_str = f'{{node_name}}, rank={{rank}},'
log_path = os.path.expanduser(os.path.join({log_dir!r}, f'{{rank}}-{{node_name}}.log'))
sky_env_vars_dict['SKYPILOT_NODE_RANK'] = rank
# Environment starting with `SKY_` is deprecated.
sky_env_vars_dict['{constants.SKYPILOT_NODE_RANK}'] = rank
# Backward compatibility: Environment starting with `SKY_` is
# deprecated. Remove it in v0.9.0.
sky_env_vars_dict['SKY_NODE_RANK'] = rank
sky_env_vars_dict['SKYPILOT_INTERNAL_JOB_ID'] = {self.job_id}
# Environment starting with `SKY_` is deprecated.
# Backward compatibility: Environment starting with `SKY_` is
# deprecated. Remove it in v0.9.0.
sky_env_vars_dict['SKY_INTERNAL_JOB_ID'] = {self.job_id}
futures.append(run_bash_command_with_log \\
Expand Down Expand Up @@ -4751,9 +4760,9 @@ def _execute_task_one_node(self, handle: CloudVmRayResourceHandle,
1,
resources_dict,
stable_cluster_internal_ips=internal_ips,
env_vars=task_env_vars,
setup_cmd=self._setup_cmd,
setup_log_path=os.path.join(log_dir, 'setup.log'),
env_vars=task_env_vars,
)

if callable(task.run):
Expand Down Expand Up @@ -4800,9 +4809,10 @@ def _execute_task_n_nodes(self, handle: CloudVmRayResourceHandle,
num_actual_nodes,
resources_dict,
stable_cluster_internal_ips=internal_ips,
env_vars=task_env_vars,
setup_cmd=self._setup_cmd,
setup_log_path=os.path.join(log_dir, 'setup.log'),
env_vars=task_env_vars)
)

if callable(task.run):
run_fn_code = textwrap.dedent(inspect.getsource(task.run))
Expand Down
6 changes: 6 additions & 0 deletions sky/skylet/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,9 @@
# The name for the environment variable that stores the URL of the SkyPilot
# API server.
SKY_API_SERVER_URL_ENV_VAR = 'SKYPILOT_API_SERVER_URL'

# SkyPilot environment variables
SKYPILOT_NUM_NODES = 'SKYPILOT_NUM_NODES'
SKYPILOT_NODE_IPS = 'SKYPILOT_NODE_IPS'
SKYPILOT_NUM_GPUS_PER_NODE = 'SKYPILOT_NUM_GPUS_PER_NODE'
SKYPILOT_NODE_RANK = 'SKYPILOT_NODE_RANK'
10 changes: 5 additions & 5 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -2936,7 +2936,7 @@ def test_managed_jobs_inline_env(generic_cloud: str):
test = Test(
'test-managed-jobs-inline-env',
[
f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"',
f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"',
'sleep 20',
f'{_JOB_QUEUE_WAIT} | grep {name} | grep SUCCEEDED',
],
Expand All @@ -2954,10 +2954,10 @@ def test_inline_env(generic_cloud: str):
test = Test(
'test-inline-env',
[
f'sky launch -c {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"',
f'sky launch -c {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"',
'sleep 20',
f'sky logs {name} 1 --status',
f'sky exec {name} --env TEST_ENV2="success" "([[ ! -z \\"\$TEST_ENV2\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"',
f'sky exec {name} --env TEST_ENV2="success" "([[ ! -z \\"\$TEST_ENV2\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"',
f'sky logs {name} 2 --status',
],
f'sky down -y {name}',
Expand All @@ -2973,9 +2973,9 @@ def test_inline_env_file(generic_cloud: str):
test = Test(
'test-inline-env-file',
[
f'sky launch -c {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"',
f'sky launch -c {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"',
f'sky logs {name} 1 --status',
f'sky exec {name} --env-file examples/sample_dotenv "([[ ! -z \\"\$TEST_ENV2\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"',
f'sky exec {name} --env-file examples/sample_dotenv "([[ ! -z \\"\$TEST_ENV2\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"',
f'sky logs {name} 2 --status',
],
f'sky down -y {name}',
Expand Down

0 comments on commit 61d6894

Please sign in to comment.