diff --git a/examples/managed_job.yaml b/examples/managed_job.yaml index 4bfcb63f40a..30ad2db287a 100644 --- a/examples/managed_job.yaml +++ b/examples/managed_job.yaml @@ -6,6 +6,7 @@ setup: | run: | conda env list + echo "start counting" python -u - << EOF import time import tqdm diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index f4d141043bb..60ee2887625 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -3175,11 +3175,11 @@ def _setup_node(node_id: int) -> None: process_stream=False, # We do not source bashrc for setup, since bashrc is sourced # in the script already. - # Skip two lines due to the /bin/bash -i and source ~/.bashrc - # in the setup_cmd. + # Skip an empty line and two lines due to the /bin/bash -i and + # source ~/.bashrc in the setup_cmd. # bash: cannot set terminal process group (7398): Inappropriate ioctl for device # pylint: disable=line-too-long # bash: no job control in this shell - skip_lines=2, + skip_lines=3, ) def error_message() -> str: @@ -3668,6 +3668,14 @@ def tail_logs(self, job_id: Optional[int], managed_job_id: Optional[int] = None, follow: bool = True) -> int: + """Tail the logs of a job. + + Args: + handle: The handle to the cluster. + job_id: The job ID to tail the logs of. + managed_job_id: The managed job ID for display purpose only. + follow: Whether to follow the logs. + """ code = job_lib.JobLibCodeGen.tail_logs(job_id, managed_job_id=managed_job_id, follow=follow) @@ -3702,15 +3710,12 @@ def tail_managed_job_logs(self, handle: CloudVmRayResourceHandle, job_id: Optional[int] = None, job_name: Optional[str] = None, + controller: bool = False, follow: bool = True) -> None: # if job_name is not None, job_id should be None assert job_name is None or job_id is None, (job_name, job_id) - if job_name is not None: - code = managed_jobs.ManagedJobCodeGen.stream_logs_by_name( - job_name, follow) - else: - code = managed_jobs.ManagedJobCodeGen.stream_logs_by_id( - job_id, follow) + code = managed_jobs.ManagedJobCodeGen.stream_logs( + job_name, job_id, follow, controller) # With the stdin=subprocess.DEVNULL, the ctrl-c will not directly # kill the process, so we need to handle it manually here. diff --git a/sky/cli.py b/sky/cli.py index 0bcec3d2f4b..23c5ba7a3cd 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3760,13 +3760,10 @@ def jobs_logs(name: Optional[str], job_id: Optional[int], follow: bool, controller: bool): """Tail the log of a managed job.""" try: - if controller: - core.tail_logs( - controller_utils.Controllers.JOBS_CONTROLLER.value.cluster_name, - job_id=job_id, - follow=follow) - else: - managed_jobs.tail_logs(name=name, job_id=job_id, follow=follow) + managed_jobs.tail_logs(name=name, + job_id=job_id, + follow=follow, + controller=controller) except exceptions.ClusterNotUpError: with ux_utils.print_exception_no_traceback(): raise diff --git a/sky/jobs/core.py b/sky/jobs/core.py index 7f9e0d757ea..561d47f4b25 100644 --- a/sky/jobs/core.py +++ b/sky/jobs/core.py @@ -278,7 +278,8 @@ def cancel(name: Optional[str] = None, @usage_lib.entrypoint -def tail_logs(name: Optional[str], job_id: Optional[int], follow: bool) -> None: +def tail_logs(name: Optional[str], job_id: Optional[int], follow: bool, + controller: bool) -> None: # NOTE(dev): Keep the docstring consistent between the Python API and CLI. """Tail logs of managed jobs. @@ -300,11 +301,12 @@ def tail_logs(name: Optional[str], job_id: Optional[int], follow: bool) -> None: raise ValueError('Cannot specify both name and job_id.') backend = backend_utils.get_backend_from_handle(handle) assert isinstance(backend, backends.CloudVmRayBackend), backend - # Stream the realtime logs + backend.tail_managed_job_logs(handle, job_id=job_id, job_name=name, - follow=follow) + follow=follow, + controller=controller) spot_launch = common_utils.deprecated_function( diff --git a/sky/jobs/utils.py b/sky/jobs/utils.py index 8220dd01621..bd238054a80 100644 --- a/sky/jobs/utils.py +++ b/sky/jobs/utils.py @@ -6,6 +6,7 @@ """ import collections import enum +import inspect import os import pathlib import shlex @@ -28,7 +29,7 @@ from sky.jobs import state as managed_job_state from sky.skylet import constants from sky.skylet import job_lib -from sky.skylet.log_lib import run_bash_command_with_log +from sky.skylet import log_lib from sky.utils import common_utils from sky.utils import log_utils from sky.utils import rich_utils @@ -184,7 +185,7 @@ def callback_func(status: str): log_path = os.path.join(constants.SKY_LOGS_DIRECTORY, 'managed_job_event', f'jobs-callback-{job_id}-{task_id}.log') - result = run_bash_command_with_log( + result = log_lib.run_bash_command_with_log( bash_command=event_callback, log_path=log_path, env_vars=dict( @@ -448,18 +449,55 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str: return '' -def stream_logs_by_name(job_name: str, follow: bool = True) -> str: - """Stream logs by name.""" - job_ids = managed_job_state.get_nonterminal_job_ids_by_name(job_name) - if len(job_ids) == 0: - return (f'{colorama.Fore.RED}No job found with name {job_name!r}.' - f'{colorama.Style.RESET_ALL}') - if len(job_ids) > 1: - return (f'{colorama.Fore.RED}Multiple running jobs found ' - f'with name {job_name!r}.\n' - f'Job IDs: {job_ids}{colorama.Style.RESET_ALL}') - stream_logs_by_id(job_ids[0], follow) - return '' +def stream_logs(job_id: Optional[int], + job_name: Optional[str], + controller: bool = False, + follow: bool = True) -> str: + """Stream logs by job id or job name.""" + if job_id is None and job_name is None: + job_id = managed_job_state.get_latest_job_id() + if job_id is None: + return 'No managed job found.' + if controller: + if job_id is None: + assert job_name is not None + managed_jobs = managed_job_state.get_managed_jobs() + # We manually filter the jobs by name, instead of using + # get_nonterminal_job_ids_by_name, as with `controller=True`, we + # should be able to show the logs for jobs in terminal states. + managed_jobs = list( + filter(lambda job: job['job_name'] == job_name, managed_jobs)) + if len(managed_jobs) == 0: + return f'No managed job found with name {job_name!r}.' + if len(managed_jobs) > 1: + job_ids_str = ', '.join(job['job_id'] for job in managed_jobs) + raise ValueError( + f'Multiple managed jobs found with name {job_name!r} (Job ' + f'IDs: {job_ids_str}). Please specify the job_id instead.') + job_id = managed_jobs[0]['job_id'] + assert job_id is not None, (job_id, job_name) + # TODO: keep the following code sync with + # job_lib.JobLibCodeGen.tail_logs, we do not directly call that function + # as the following code need to be run in the current machine, instead + # of running remotely. + run_timestamp = job_lib.get_run_timestamp(job_id) + if run_timestamp is None: + return f'No managed job contrller log found with job_id {job_id}.' + log_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp) + log_lib.tail_logs(job_id=job_id, log_dir=log_dir, follow=follow) + return '' + + if job_id is None: + assert job_name is not None + job_ids = managed_job_state.get_nonterminal_job_ids_by_name(job_name) + if len(job_ids) == 0: + return f'No running managed job found with name {job_name!r}.' + if len(job_ids) > 1: + raise ValueError( + f'Multiple running jobs found with name {job_name!r}.') + job_id = job_ids[0] + + return stream_logs_by_id(job_id, follow) def dump_managed_job_queue() -> str: @@ -716,10 +754,14 @@ class ManagedJobCodeGen: _PREFIX = textwrap.dedent("""\ managed_job_version = 0 try: - from sky.jobs import constants, state, utils - managed_job_version = constants.MANAGED_JOBS_VERSION + from sky.jobs import utils + from sky.jobs import constants as managed_job_constants + from sky.jobs import state as managed_job_state + + managed_job_version = managed_job_constants.MANAGED_JOBS_VERSION except ImportError: - from sky.spot import spot_state as state, spot_utils as utils + from sky.spot import spot_state as managed_job_state + from sky.spot import spot_utils as utils """) @classmethod @@ -750,20 +792,28 @@ def cancel_job_by_name(cls, job_name: str) -> str: return cls._build(code) @classmethod - def stream_logs_by_name(cls, job_name: str, follow: bool = True) -> str: - code = textwrap.dedent(f"""\ - msg = utils.stream_logs_by_name({job_name!r}, follow={follow}) - print(msg, flush=True) + def stream_logs(cls, + job_name: Optional[str], + job_id: Optional[int], + follow: bool = True, + controller: bool = False) -> str: + # We inspect the source code of the function here for backward + # compatibility. + # TODO: change to utils.stream_logs(job_id, job_name, follow) in v0.8.0. + # Import libraries required by `stream_logs` + code = textwrap.dedent("""\ + import os + + from sky.skylet import job_lib, log_lib + from sky.skylet import constants + from sky.jobs.utils import stream_logs_by_id + from typing import Optional """) - return cls._build(code) + code += inspect.getsource(stream_logs) + code += textwrap.dedent(f"""\ - @classmethod - def stream_logs_by_id(cls, - job_id: Optional[int], - follow: bool = True) -> str: - code = textwrap.dedent(f"""\ - job_id = {job_id} if {job_id} is not None else state.get_latest_job_id() - msg = utils.stream_logs_by_id(job_id, follow={follow}) + msg = stream_logs({job_id!r}, {job_name!r}, + follow={follow}, controller={controller}) print(msg, flush=True) """) return cls._build(code) @@ -773,13 +823,13 @@ def set_pending(cls, job_id: int, managed_job_dag: 'dag_lib.Dag') -> str: dag_name = managed_job_dag.name # Add the managed job to queue table. code = textwrap.dedent(f"""\ - state.set_job_name({job_id}, {dag_name!r}) + managed_job_state.set_job_name({job_id}, {dag_name!r}) """) for task_id, task in enumerate(managed_job_dag.tasks): resources_str = backend_utils.get_task_resources_str( task, is_managed_job=True) code += textwrap.dedent(f"""\ - state.set_pending({job_id}, {task_id}, + managed_job_state.set_pending({job_id}, {task_id}, {task.name!r}, {resources_str!r}) """) return cls._build(code) diff --git a/sky/skylet/constants.py b/sky/skylet/constants.py index 0c68fd7f6e6..dfac3e3b2ee 100644 --- a/sky/skylet/constants.py +++ b/sky/skylet/constants.py @@ -130,6 +130,9 @@ # backend_utils.write_cluster_config. RAY_SKYPILOT_INSTALLATION_COMMANDS = ( 'mkdir -p ~/sky_workdir && mkdir -p ~/.sky/sky_app;' + # Disable the pip version check to avoid the warning message, which makes + # the output hard to read. + 'export PIP_DISABLE_PIP_VERSION_CHECK=1;' # Print the PATH in provision.log to help debug PATH issues. 'echo PATH=$PATH; ' # Backward compatibility for ray upgrade (#3248): do not upgrade ray if the diff --git a/sky/templates/jobs-controller.yaml.j2 b/sky/templates/jobs-controller.yaml.j2 index 7d15dc680ac..51083e84a59 100644 --- a/sky/templates/jobs-controller.yaml.j2 +++ b/sky/templates/jobs-controller.yaml.j2 @@ -11,6 +11,9 @@ file_mounts: setup: | {{ sky_activate_python_env }} + # Disable the pip version check to avoid the warning message, which makes the + # output hard to read. + export PIP_DISABLE_PIP_VERSION_CHECK=1 {%- for cmd in cloud_dependencies_installation_commands %} {{cmd}} diff --git a/sky/templates/sky-serve-controller.yaml.j2 b/sky/templates/sky-serve-controller.yaml.j2 index a4f1b49e3ed..a20c2d680aa 100644 --- a/sky/templates/sky-serve-controller.yaml.j2 +++ b/sky/templates/sky-serve-controller.yaml.j2 @@ -4,6 +4,9 @@ name: {{service_name}} setup: | {{ sky_activate_python_env }} + # Disable the pip version check to avoid the warning message, which makes the + # output hard to read. + export PIP_DISABLE_PIP_VERSION_CHECK=1 # Install all cloud dependencies. # This is for multicloud support. To allow controller launch on all clouds, diff --git a/sky/utils/command_runner.py b/sky/utils/command_runner.py index f43296c2f1e..e263cd786ab 100644 --- a/sky/utils/command_runner.py +++ b/sky/utils/command_runner.py @@ -448,13 +448,11 @@ def run( proc = subprocess_utils.run(command, shell=False, check=False) return proc.returncode, '', '' - command_str = self._get_command_to_run( - cmd, - process_stream, - separate_stderr, - # +1 to skip first new line. - skip_lines=skip_lines + 1, - source_bashrc=source_bashrc) + command_str = self._get_command_to_run(cmd, + process_stream, + separate_stderr, + skip_lines=skip_lines, + source_bashrc=source_bashrc) command = base_ssh_command + [shlex.quote(command_str)] log_dir = os.path.expanduser(os.path.dirname(log_path)) diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index 5609db4a04a..c1859d52663 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -207,7 +207,7 @@ def _get_cloud_dependencies_installation_commands( # fluidstack and paperspace continue if isinstance(cloud, clouds.AWS): - commands.append(f'echo -n "{prefix_str}AWS{empty_str}" && ' + + commands.append(f'echo -en "\\r{prefix_str}AWS{empty_str}" && ' + aws_dependencies_installation) elif isinstance(cloud, clouds.Azure): commands.append( diff --git a/tests/test_smoke.py b/tests/test_smoke.py index db8f684c228..c0e98fe90ba 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -2303,6 +2303,9 @@ def test_managed_jobs(generic_cloud: str): f'{_JOB_QUEUE_WAIT}| grep {name}-1 | head -n1 | grep "CANCELLING\|CANCELLED"', 'sleep 200', f'{_JOB_QUEUE_WAIT}| grep {name}-1 | head -n1 | grep CANCELLED', + # Test the functionality for logging. + f's=$(sky jobs logs -n {name}-2 --no-follow); echo "$s"; echo "$s" | grep "start counting"', + f's=$(sky jobs logs --controller -n {name}-2 --no-follow); echo "$s"; echo "$s" | grep "Successfully provisioned cluster:"', f'{_JOB_QUEUE_WAIT}| grep {name}-2 | head -n1 | grep "RUNNING\|SUCCEEDED"', ], # TODO(zhwu): Change to _JOB_CANCEL_WAIT.format(job_name=f'{name}-1 -n {name}-2') when