diff --git a/docs/source/running-jobs/environment-variables.rst b/docs/source/running-jobs/environment-variables.rst index 2f3427c1bf55..7f91720f9b59 100644 --- a/docs/source/running-jobs/environment-variables.rst +++ b/docs/source/running-jobs/environment-variables.rst @@ -120,8 +120,12 @@ Environment variables for ``setup`` - Rank (an integer ID from 0 to :code:`num_nodes-1`) of the node being set up. - 0 * - ``SKYPILOT_SETUP_NODE_IPS`` - - A string of IP addresses of the nodes in the cluster with the same order as the node ranks, where each line contains one IP address. + - A string of IP addresses of the nodes in the cluster with the same order as the node ranks, where each line contains one IP address. Note that this is not necessarily the same as the nodes in ``run`` stage, as the ``setup`` stage runs on all nodes of the cluster, while the ``run`` stage can run on a subset of nodes. - 1.2.3.4 + 3.4.5.6 + * - ``SKYPILOT_NUM_NODES`` + - Number of nodes in the cluster. Same value as ``$(echo "$SKYPILOT_NODE_IPS" | wc -l)``. + - 2 * - ``SKYPILOT_TASK_ID`` - A unique ID assigned to each task. @@ -159,6 +163,9 @@ Environment variables for ``run`` * - ``SKYPILOT_NODE_IPS`` - A string of IP addresses of the nodes reserved to execute the task, where each line contains one IP address. Read more :ref:`here `. - 1.2.3.4 + * - ``SKYPILOT_NUM_NODES`` + - Number of nodes assigned to execute the current task. Same value as ``$(echo "$SKYPILOT_NODE_IPS" | wc -l)``. Read more :ref:`here `. + - 1 * - ``SKYPILOT_NUM_GPUS_PER_NODE`` - Number of GPUs reserved on each node to execute the task; the same as the count in ``accelerators: :`` (rounded up if a fraction). Read diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 943d6097aca4..fb6031ace5e2 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -269,8 +269,9 @@ def add_prologue(self, job_id: int) -> None: SKY_REMOTE_WORKDIR = {constants.SKY_REMOTE_WORKDIR!r} kwargs = dict() - # Only set the `_temp_dir` to SkyPilot's ray cluster directory when the directory - # exists for backward compatibility for the VM launched before #1790. + # Only set the `_temp_dir` to SkyPilot's ray cluster directory when + # the directory exists for backward compatibility for the VM + # launched before #1790. if os.path.exists({constants.SKY_REMOTE_RAY_TEMPDIR!r}): kwargs['_temp_dir'] = {constants.SKY_REMOTE_RAY_TEMPDIR!r} ray.init( @@ -308,8 +309,9 @@ def get_or_fail(futures, pg) -> List[int]: ready, unready = ray.wait(unready) idx = futures.index(ready[0]) returncodes[idx] = ray.get(ready[0]) - # Remove the placement group after all tasks are done, so that the - # next job can be scheduled on the released resources immediately. + # Remove the placement group after all tasks are done, so that + # the next job can be scheduled on the released resources + # immediately. ray_util.remove_placement_group(pg) sys.stdout.flush() return returncodes @@ -348,9 +350,9 @@ def add_gang_scheduling_placement_group_and_setup( num_nodes: int, resources_dict: Dict[str, float], stable_cluster_internal_ips: List[str], + env_vars: Dict[str, str], setup_cmd: Optional[str] = None, setup_log_path: Optional[str] = None, - env_vars: Optional[Dict[str, str]] = None, ) -> None: """Create the gang scheduling placement group for a Task. @@ -410,6 +412,8 @@ def add_gang_scheduling_placement_group_and_setup( job_id = self.job_id if setup_cmd is not None: + setup_envs = env_vars.copy() + setup_envs[constants.SKYPILOT_NUM_NODES] = str(num_nodes) self._code += [ textwrap.dedent(f"""\ setup_cmd = {setup_cmd!r} @@ -439,7 +443,7 @@ def add_gang_scheduling_placement_group_and_setup( .remote( setup_cmd, os.path.expanduser({setup_log_path!r}), - env_vars={env_vars!r}, + env_vars={setup_envs!r}, stream_logs=True, with_ray=True, ) for i in range(total_num_nodes)] @@ -550,11 +554,13 @@ def add_ray_task(self, f'placement_group_bundle_index={gang_scheduling_id})') sky_env_vars_dict_str = [ - textwrap.dedent("""\ - sky_env_vars_dict = {} - sky_env_vars_dict['SKYPILOT_NODE_IPS'] = job_ip_list_str - # Environment starting with `SKY_` is deprecated. + textwrap.dedent(f"""\ + sky_env_vars_dict = {{}} + sky_env_vars_dict['{constants.SKYPILOT_NODE_IPS}'] = job_ip_list_str + # Backward compatibility: Environment starting with `SKY_` is + # deprecated. Remove it in v0.9.0. sky_env_vars_dict['SKY_NODE_IPS'] = job_ip_list_str + sky_env_vars_dict['{constants.SKYPILOT_NUM_NODES}'] = len(job_ip_rank_list) """) ] @@ -575,8 +581,9 @@ def add_ray_task(self, if script is not None: - sky_env_vars_dict['SKYPILOT_NUM_GPUS_PER_NODE'] = {int(math.ceil(num_gpus))!r} - # Environment starting with `SKY_` is deprecated. + sky_env_vars_dict['{constants.SKYPILOT_NUM_GPUS_PER_NODE}'] = {int(math.ceil(num_gpus))!r} + # Backward compatibility: Environment starting with `SKY_` is + # deprecated. Remove it in v0.9.0. sky_env_vars_dict['SKY_NUM_GPUS_PER_NODE'] = {int(math.ceil(num_gpus))!r} ip = gang_scheduling_id_to_ip[{gang_scheduling_id!r}] @@ -593,12 +600,14 @@ def add_ray_task(self, node_name = f'worker{{idx_in_cluster}}' name_str = f'{{node_name}}, rank={{rank}},' log_path = os.path.expanduser(os.path.join({log_dir!r}, f'{{rank}}-{{node_name}}.log')) - sky_env_vars_dict['SKYPILOT_NODE_RANK'] = rank - # Environment starting with `SKY_` is deprecated. + sky_env_vars_dict['{constants.SKYPILOT_NODE_RANK}'] = rank + # Backward compatibility: Environment starting with `SKY_` is + # deprecated. Remove it in v0.9.0. sky_env_vars_dict['SKY_NODE_RANK'] = rank sky_env_vars_dict['SKYPILOT_INTERNAL_JOB_ID'] = {self.job_id} - # Environment starting with `SKY_` is deprecated. + # Backward compatibility: Environment starting with `SKY_` is + # deprecated. Remove it in v0.9.0. sky_env_vars_dict['SKY_INTERNAL_JOB_ID'] = {self.job_id} futures.append(run_bash_command_with_log \\ @@ -4751,9 +4760,9 @@ def _execute_task_one_node(self, handle: CloudVmRayResourceHandle, 1, resources_dict, stable_cluster_internal_ips=internal_ips, + env_vars=task_env_vars, setup_cmd=self._setup_cmd, setup_log_path=os.path.join(log_dir, 'setup.log'), - env_vars=task_env_vars, ) if callable(task.run): @@ -4800,9 +4809,10 @@ def _execute_task_n_nodes(self, handle: CloudVmRayResourceHandle, num_actual_nodes, resources_dict, stable_cluster_internal_ips=internal_ips, + env_vars=task_env_vars, setup_cmd=self._setup_cmd, setup_log_path=os.path.join(log_dir, 'setup.log'), - env_vars=task_env_vars) + ) if callable(task.run): run_fn_code = textwrap.dedent(inspect.getsource(task.run)) diff --git a/sky/skylet/constants.py b/sky/skylet/constants.py index d155c5a6cf8a..e9c02d5dea6a 100644 --- a/sky/skylet/constants.py +++ b/sky/skylet/constants.py @@ -237,3 +237,9 @@ # The name for the environment variable that stores the URL of the SkyPilot # API server. SKY_API_SERVER_URL_ENV_VAR = 'SKYPILOT_API_SERVER_URL' + +# SkyPilot environment variables +SKYPILOT_NUM_NODES = 'SKYPILOT_NUM_NODES' +SKYPILOT_NODE_IPS = 'SKYPILOT_NODE_IPS' +SKYPILOT_NUM_GPUS_PER_NODE = 'SKYPILOT_NUM_GPUS_PER_NODE' +SKYPILOT_NODE_RANK = 'SKYPILOT_NODE_RANK' diff --git a/tests/test_smoke.py b/tests/test_smoke.py index d19863b52fee..c47845db848c 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -2936,7 +2936,7 @@ def test_managed_jobs_inline_env(generic_cloud: str): test = Test( 'test-managed-jobs-inline-env', [ - f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"', + f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"', 'sleep 20', f'{_JOB_QUEUE_WAIT} | grep {name} | grep SUCCEEDED', ], @@ -2954,10 +2954,10 @@ def test_inline_env(generic_cloud: str): test = Test( 'test-inline-env', [ - f'sky launch -c {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"', + f'sky launch -c {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"', 'sleep 20', f'sky logs {name} 1 --status', - f'sky exec {name} --env TEST_ENV2="success" "([[ ! -z \\"\$TEST_ENV2\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"', + f'sky exec {name} --env TEST_ENV2="success" "([[ ! -z \\"\$TEST_ENV2\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"', f'sky logs {name} 2 --status', ], f'sky down -y {name}', @@ -2973,9 +2973,9 @@ def test_inline_env_file(generic_cloud: str): test = Test( 'test-inline-env-file', [ - f'sky launch -c {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"', + f'sky launch -c {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"', f'sky logs {name} 1 --status', - f'sky exec {name} --env-file examples/sample_dotenv "([[ ! -z \\"\$TEST_ENV2\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_IPS\\" ]] && [[ ! -z \\"\$SKYPILOT_NODE_RANK\\" ]]) || exit 1"', + f'sky exec {name} --env-file examples/sample_dotenv "([[ ! -z \\"\$TEST_ENV2\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"', f'sky logs {name} 2 --status', ], f'sky down -y {name}',