Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Add SKYPILOT_NUM_NODES env var #3656

Merged
merged 10 commits into from
Jun 13, 2024
9 changes: 8 additions & 1 deletion docs/source/running-jobs/environment-variables.rst
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,12 @@ Environment variables for ``setup``
- Rank (an integer ID from 0 to :code:`num_nodes-1`) of the node being set up.
- 0
* - ``SKYPILOT_SETUP_NODE_IPS``
- A string of IP addresses of the nodes in the cluster with the same order as the node ranks, where each line contains one IP address.
- A string of IP addresses of the nodes in the cluster with the same order as the node ranks, where each line contains one IP address. Note that this is not necessarily the same as the nodes in ``run`` stage, as the ``setup`` stage runs on all nodes of the cluster, while the ``run`` stage can run on a subset of nodes.
- 1.2.3.4
3.4.5.6
* - ``SKYPILOT_NUM_NODES``
- Number of nodes in the cluster. Same value as ``$(echo "$SKYPILOT_NODE_IPS" | wc -l)``.
- 2
* - ``SKYPILOT_TASK_ID``
- A unique ID assigned to each task.

Expand Down Expand Up @@ -159,6 +163,9 @@ Environment variables for ``run``
* - ``SKYPILOT_NODE_IPS``
- A string of IP addresses of the nodes reserved to execute the task, where each line contains one IP address. Read more :ref:`here <dist-jobs>`.
- 1.2.3.4
* - ``SKYPILOT_NUM_NODES``
- Number of nodes assigned to execute the current task. Same value as ``$(echo "$SKYPILOT_NODE_IPS" | wc -l)``. Read more :ref:`here <dist-jobs>`.
- 1
* - ``SKYPILOT_NUM_GPUS_PER_NODE``
- Number of GPUs reserved on each node to execute the task; the same as the
count in ``accelerators: <name>:<count>`` (rounded up if a fraction). Read
Expand Down
44 changes: 27 additions & 17 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,9 @@ def add_prologue(self, job_id: int) -> None:
SKY_REMOTE_WORKDIR = {constants.SKY_REMOTE_WORKDIR!r}

kwargs = dict()
# Only set the `_temp_dir` to SkyPilot's ray cluster directory when the directory
# exists for backward compatibility for the VM launched before #1790.
# Only set the `_temp_dir` to SkyPilot's ray cluster directory when
# the directory exists for backward compatibility for the VM
# launched before #1790.
if os.path.exists({constants.SKY_REMOTE_RAY_TEMPDIR!r}):
kwargs['_temp_dir'] = {constants.SKY_REMOTE_RAY_TEMPDIR!r}
ray.init(
Expand Down Expand Up @@ -307,8 +308,9 @@ def get_or_fail(futures, pg) -> List[int]:
ready, unready = ray.wait(unready)
idx = futures.index(ready[0])
returncodes[idx] = ray.get(ready[0])
# Remove the placement group after all tasks are done, so that the
# next job can be scheduled on the released resources immediately.
# Remove the placement group after all tasks are done, so that
# the next job can be scheduled on the released resources
# immediately.
ray_util.remove_placement_group(pg)
sys.stdout.flush()
return returncodes
Expand Down Expand Up @@ -347,9 +349,9 @@ def add_gang_scheduling_placement_group_and_setup(
num_nodes: int,
resources_dict: Dict[str, float],
stable_cluster_internal_ips: List[str],
env_vars: Dict[str, str],
setup_cmd: Optional[str] = None,
setup_log_path: Optional[str] = None,
env_vars: Optional[Dict[str, str]] = None,
) -> None:
"""Create the gang scheduling placement group for a Task.

Expand Down Expand Up @@ -409,6 +411,8 @@ def add_gang_scheduling_placement_group_and_setup(

job_id = self.job_id
if setup_cmd is not None:
setup_envs = env_vars.copy()
setup_envs[constants.SKYPILOT_NUM_NODES] = str(num_nodes)
self._code += [
textwrap.dedent(f"""\
setup_cmd = {setup_cmd!r}
Expand Down Expand Up @@ -438,7 +442,7 @@ def add_gang_scheduling_placement_group_and_setup(
.remote(
setup_cmd,
os.path.expanduser({setup_log_path!r}),
env_vars={env_vars!r},
env_vars={setup_envs!r},
stream_logs=True,
with_ray=True,
) for i in range(total_num_nodes)]
Expand Down Expand Up @@ -549,11 +553,13 @@ def add_ray_task(self,
f'placement_group_bundle_index={gang_scheduling_id})')

sky_env_vars_dict_str = [
textwrap.dedent("""\
sky_env_vars_dict = {}
sky_env_vars_dict['SKYPILOT_NODE_IPS'] = job_ip_list_str
# Environment starting with `SKY_` is deprecated.
textwrap.dedent(f"""\
sky_env_vars_dict = {{}}
sky_env_vars_dict['{constants.SKYPILOT_NODE_IPS}'] = job_ip_list_str
# Backward compatibility: Environment starting with `SKY_` is
# deprecated. Remove it in v0.9.0.
sky_env_vars_dict['SKY_NODE_IPS'] = job_ip_list_str
sky_env_vars_dict['{constants.SKYPILOT_NUM_NODES}'] = len(job_ip_rank_list)
""")
]

Expand All @@ -574,8 +580,9 @@ def add_ray_task(self,


if script is not None:
sky_env_vars_dict['SKYPILOT_NUM_GPUS_PER_NODE'] = {int(math.ceil(num_gpus))!r}
# Environment starting with `SKY_` is deprecated.
sky_env_vars_dict['{constants.SKYPILOT_NUM_GPUS_PER_NODE}'] = {int(math.ceil(num_gpus))!r}
# Backward compatibility: Environment starting with `SKY_` is
# deprecated. Remove it in v0.9.0.
sky_env_vars_dict['SKY_NUM_GPUS_PER_NODE'] = {int(math.ceil(num_gpus))!r}

ip = gang_scheduling_id_to_ip[{gang_scheduling_id!r}]
Expand All @@ -592,12 +599,14 @@ def add_ray_task(self,
node_name = f'worker{{idx_in_cluster}}'
name_str = f'{{node_name}}, rank={{rank}},'
log_path = os.path.expanduser(os.path.join({log_dir!r}, f'{{rank}}-{{node_name}}.log'))
sky_env_vars_dict['SKYPILOT_NODE_RANK'] = rank
# Environment starting with `SKY_` is deprecated.
sky_env_vars_dict['{constants.SKYPILOT_NODE_RANK}'] = rank
# Backward compatibility: Environment starting with `SKY_` is
# deprecated. Remove it in v0.9.0.
sky_env_vars_dict['SKY_NODE_RANK'] = rank

sky_env_vars_dict['SKYPILOT_INTERNAL_JOB_ID'] = {self.job_id}
# Environment starting with `SKY_` is deprecated.
# Backward compatibility: Environment starting with `SKY_` is
# deprecated. Remove it in v0.9.0.
sky_env_vars_dict['SKY_INTERNAL_JOB_ID'] = {self.job_id}

futures.append(run_bash_command_with_log \\
Expand Down Expand Up @@ -4749,9 +4758,9 @@ def _execute_task_one_node(self, handle: CloudVmRayResourceHandle,
1,
resources_dict,
stable_cluster_internal_ips=internal_ips,
env_vars=task_env_vars,
setup_cmd=self._setup_cmd,
setup_log_path=os.path.join(log_dir, 'setup.log'),
env_vars=task_env_vars,
)

if callable(task.run):
Expand Down Expand Up @@ -4798,9 +4807,10 @@ def _execute_task_n_nodes(self, handle: CloudVmRayResourceHandle,
num_actual_nodes,
resources_dict,
stable_cluster_internal_ips=internal_ips,
env_vars=task_env_vars,
setup_cmd=self._setup_cmd,
setup_log_path=os.path.join(log_dir, 'setup.log'),
env_vars=task_env_vars)
)

if callable(task.run):
run_fn_code = textwrap.dedent(inspect.getsource(task.run))
Expand Down
6 changes: 6 additions & 0 deletions sky/skylet/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,9 @@
# Serve: A default controller with 4 vCPU and 16 GB memory can run up to 16
# services.
CONTROLLER_PROCESS_CPU_DEMAND = 0.25

# SkyPilot environment variables
SKYPILOT_NUM_NODES = 'SKYPILOT_NUM_NODES'
SKYPILOT_NODE_IPS = 'SKYPILOT_NODE_IPS'
SKYPILOT_NUM_GPUS_PER_NODE = 'SKYPILOT_NUM_GPUS_PER_NODE'
SKYPILOT_NODE_RANK = 'SKYPILOT_NODE_RANK'
10 changes: 5 additions & 5 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -2936,7 +2936,7 @@ def test_managed_jobs_inline_env(generic_cloud: str):
test = Test(
'test-managed-jobs-inline-env',
[
f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"',
f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"',
'sleep 20',
f'{_JOB_QUEUE_WAIT} | grep {name} | grep SUCCEEDED',
],
Expand All @@ -2954,10 +2954,10 @@ def test_inline_env(generic_cloud: str):
test = Test(
'test-inline-env',
[
f'sky launch -c {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"',
f'sky launch -c {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"',
'sleep 20',
f'sky logs {name} 1 --status',
f'sky exec {name} --env TEST_ENV2="success" "([[ ! -z \\"\$TEST_ENV2\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"',
f'sky exec {name} --env TEST_ENV2="success" "([[ ! -z \\"\$TEST_ENV2\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"',
f'sky logs {name} 2 --status',
],
f'sky down -y {name}',
Expand All @@ -2973,9 +2973,9 @@ def test_inline_env_file(generic_cloud: str):
test = Test(
'test-inline-env-file',
[
f'sky launch -c {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"',
f'sky launch -c {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"',
f'sky logs {name} 1 --status',
f'sky exec {name} --env-file examples/sample_dotenv "([[ ! -z \\"\$TEST_ENV2\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"',
f'sky exec {name} --env-file examples/sample_dotenv "([[ ! -z \\"\$TEST_ENV2\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"',
f'sky logs {name} 2 --status',
],
f'sky down -y {name}',
Expand Down
Loading