Skip to content

Commit

Permalink
[Core] Add sky jobs logs --name and fix sky job logs spinner (#3538)
Browse files Browse the repository at this point in the history
* [Test] Add test for job logs

* Fix

* Allow tailing controller logs with job name

* refactor and add job name for sky job logs

* fix state.

* fix

* fix

* Fix controller log

* fix dependency hint

* fix

* fix #3599

* format

* address comments

* format
  • Loading branch information
Michaelvll authored May 28, 2024
1 parent e006a79 commit ef2b408
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 58 deletions.
1 change: 1 addition & 0 deletions examples/managed_job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ setup: |
run: |
conda env list
echo "start counting"
python -u - << EOF
import time
import tqdm
Expand Down
23 changes: 14 additions & 9 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3175,11 +3175,11 @@ def _setup_node(node_id: int) -> None:
process_stream=False,
# We do not source bashrc for setup, since bashrc is sourced
# in the script already.
# Skip two lines due to the /bin/bash -i and source ~/.bashrc
# in the setup_cmd.
# Skip an empty line and two lines due to the /bin/bash -i and
# source ~/.bashrc in the setup_cmd.
# bash: cannot set terminal process group (7398): Inappropriate ioctl for device # pylint: disable=line-too-long
# bash: no job control in this shell
skip_lines=2,
skip_lines=3,
)

def error_message() -> str:
Expand Down Expand Up @@ -3668,6 +3668,14 @@ def tail_logs(self,
job_id: Optional[int],
managed_job_id: Optional[int] = None,
follow: bool = True) -> int:
"""Tail the logs of a job.
Args:
handle: The handle to the cluster.
job_id: The job ID to tail the logs of.
managed_job_id: The managed job ID for display purpose only.
follow: Whether to follow the logs.
"""
code = job_lib.JobLibCodeGen.tail_logs(job_id,
managed_job_id=managed_job_id,
follow=follow)
Expand Down Expand Up @@ -3702,15 +3710,12 @@ def tail_managed_job_logs(self,
handle: CloudVmRayResourceHandle,
job_id: Optional[int] = None,
job_name: Optional[str] = None,
controller: bool = False,
follow: bool = True) -> None:
# if job_name is not None, job_id should be None
assert job_name is None or job_id is None, (job_name, job_id)
if job_name is not None:
code = managed_jobs.ManagedJobCodeGen.stream_logs_by_name(
job_name, follow)
else:
code = managed_jobs.ManagedJobCodeGen.stream_logs_by_id(
job_id, follow)
code = managed_jobs.ManagedJobCodeGen.stream_logs(
job_name, job_id, follow, controller)

# With the stdin=subprocess.DEVNULL, the ctrl-c will not directly
# kill the process, so we need to handle it manually here.
Expand Down
11 changes: 4 additions & 7 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3760,13 +3760,10 @@ def jobs_logs(name: Optional[str], job_id: Optional[int], follow: bool,
controller: bool):
"""Tail the log of a managed job."""
try:
if controller:
core.tail_logs(
controller_utils.Controllers.JOBS_CONTROLLER.value.cluster_name,
job_id=job_id,
follow=follow)
else:
managed_jobs.tail_logs(name=name, job_id=job_id, follow=follow)
managed_jobs.tail_logs(name=name,
job_id=job_id,
follow=follow,
controller=controller)
except exceptions.ClusterNotUpError:
with ux_utils.print_exception_no_traceback():
raise
Expand Down
8 changes: 5 additions & 3 deletions sky/jobs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ def cancel(name: Optional[str] = None,


@usage_lib.entrypoint
def tail_logs(name: Optional[str], job_id: Optional[int], follow: bool) -> None:
def tail_logs(name: Optional[str], job_id: Optional[int], follow: bool,
controller: bool) -> None:
# NOTE(dev): Keep the docstring consistent between the Python API and CLI.
"""Tail logs of managed jobs.
Expand All @@ -300,11 +301,12 @@ def tail_logs(name: Optional[str], job_id: Optional[int], follow: bool) -> None:
raise ValueError('Cannot specify both name and job_id.')
backend = backend_utils.get_backend_from_handle(handle)
assert isinstance(backend, backends.CloudVmRayBackend), backend
# Stream the realtime logs

backend.tail_managed_job_logs(handle,
job_id=job_id,
job_name=name,
follow=follow)
follow=follow,
controller=controller)


spot_launch = common_utils.deprecated_function(
Expand Down
112 changes: 81 additions & 31 deletions sky/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
import collections
import enum
import inspect
import os
import pathlib
import shlex
Expand All @@ -28,7 +29,7 @@
from sky.jobs import state as managed_job_state
from sky.skylet import constants
from sky.skylet import job_lib
from sky.skylet.log_lib import run_bash_command_with_log
from sky.skylet import log_lib
from sky.utils import common_utils
from sky.utils import log_utils
from sky.utils import rich_utils
Expand Down Expand Up @@ -184,7 +185,7 @@ def callback_func(status: str):
log_path = os.path.join(constants.SKY_LOGS_DIRECTORY,
'managed_job_event',
f'jobs-callback-{job_id}-{task_id}.log')
result = run_bash_command_with_log(
result = log_lib.run_bash_command_with_log(
bash_command=event_callback,
log_path=log_path,
env_vars=dict(
Expand Down Expand Up @@ -448,18 +449,55 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str:
return ''


def stream_logs_by_name(job_name: str, follow: bool = True) -> str:
"""Stream logs by name."""
job_ids = managed_job_state.get_nonterminal_job_ids_by_name(job_name)
if len(job_ids) == 0:
return (f'{colorama.Fore.RED}No job found with name {job_name!r}.'
f'{colorama.Style.RESET_ALL}')
if len(job_ids) > 1:
return (f'{colorama.Fore.RED}Multiple running jobs found '
f'with name {job_name!r}.\n'
f'Job IDs: {job_ids}{colorama.Style.RESET_ALL}')
stream_logs_by_id(job_ids[0], follow)
return ''
def stream_logs(job_id: Optional[int],
job_name: Optional[str],
controller: bool = False,
follow: bool = True) -> str:
"""Stream logs by job id or job name."""
if job_id is None and job_name is None:
job_id = managed_job_state.get_latest_job_id()
if job_id is None:
return 'No managed job found.'
if controller:
if job_id is None:
assert job_name is not None
managed_jobs = managed_job_state.get_managed_jobs()
# We manually filter the jobs by name, instead of using
# get_nonterminal_job_ids_by_name, as with `controller=True`, we
# should be able to show the logs for jobs in terminal states.
managed_jobs = list(
filter(lambda job: job['job_name'] == job_name, managed_jobs))
if len(managed_jobs) == 0:
return f'No managed job found with name {job_name!r}.'
if len(managed_jobs) > 1:
job_ids_str = ', '.join(job['job_id'] for job in managed_jobs)
raise ValueError(
f'Multiple managed jobs found with name {job_name!r} (Job '
f'IDs: {job_ids_str}). Please specify the job_id instead.')
job_id = managed_jobs[0]['job_id']
assert job_id is not None, (job_id, job_name)
# TODO: keep the following code sync with
# job_lib.JobLibCodeGen.tail_logs, we do not directly call that function
# as the following code need to be run in the current machine, instead
# of running remotely.
run_timestamp = job_lib.get_run_timestamp(job_id)
if run_timestamp is None:
return f'No managed job contrller log found with job_id {job_id}.'
log_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp)
log_lib.tail_logs(job_id=job_id, log_dir=log_dir, follow=follow)
return ''

if job_id is None:
assert job_name is not None
job_ids = managed_job_state.get_nonterminal_job_ids_by_name(job_name)
if len(job_ids) == 0:
return f'No running managed job found with name {job_name!r}.'
if len(job_ids) > 1:
raise ValueError(
f'Multiple running jobs found with name {job_name!r}.')
job_id = job_ids[0]

return stream_logs_by_id(job_id, follow)


def dump_managed_job_queue() -> str:
Expand Down Expand Up @@ -716,10 +754,14 @@ class ManagedJobCodeGen:
_PREFIX = textwrap.dedent("""\
managed_job_version = 0
try:
from sky.jobs import constants, state, utils
managed_job_version = constants.MANAGED_JOBS_VERSION
from sky.jobs import utils
from sky.jobs import constants as managed_job_constants
from sky.jobs import state as managed_job_state
managed_job_version = managed_job_constants.MANAGED_JOBS_VERSION
except ImportError:
from sky.spot import spot_state as state, spot_utils as utils
from sky.spot import spot_state as managed_job_state
from sky.spot import spot_utils as utils
""")

@classmethod
Expand Down Expand Up @@ -750,20 +792,28 @@ def cancel_job_by_name(cls, job_name: str) -> str:
return cls._build(code)

@classmethod
def stream_logs_by_name(cls, job_name: str, follow: bool = True) -> str:
code = textwrap.dedent(f"""\
msg = utils.stream_logs_by_name({job_name!r}, follow={follow})
print(msg, flush=True)
def stream_logs(cls,
job_name: Optional[str],
job_id: Optional[int],
follow: bool = True,
controller: bool = False) -> str:
# We inspect the source code of the function here for backward
# compatibility.
# TODO: change to utils.stream_logs(job_id, job_name, follow) in v0.8.0.
# Import libraries required by `stream_logs`
code = textwrap.dedent("""\
import os
from sky.skylet import job_lib, log_lib
from sky.skylet import constants
from sky.jobs.utils import stream_logs_by_id
from typing import Optional
""")
return cls._build(code)
code += inspect.getsource(stream_logs)
code += textwrap.dedent(f"""\
@classmethod
def stream_logs_by_id(cls,
job_id: Optional[int],
follow: bool = True) -> str:
code = textwrap.dedent(f"""\
job_id = {job_id} if {job_id} is not None else state.get_latest_job_id()
msg = utils.stream_logs_by_id(job_id, follow={follow})
msg = stream_logs({job_id!r}, {job_name!r},
follow={follow}, controller={controller})
print(msg, flush=True)
""")
return cls._build(code)
Expand All @@ -773,13 +823,13 @@ def set_pending(cls, job_id: int, managed_job_dag: 'dag_lib.Dag') -> str:
dag_name = managed_job_dag.name
# Add the managed job to queue table.
code = textwrap.dedent(f"""\
state.set_job_name({job_id}, {dag_name!r})
managed_job_state.set_job_name({job_id}, {dag_name!r})
""")
for task_id, task in enumerate(managed_job_dag.tasks):
resources_str = backend_utils.get_task_resources_str(
task, is_managed_job=True)
code += textwrap.dedent(f"""\
state.set_pending({job_id}, {task_id},
managed_job_state.set_pending({job_id}, {task_id},
{task.name!r}, {resources_str!r})
""")
return cls._build(code)
Expand Down
3 changes: 3 additions & 0 deletions sky/skylet/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@
# backend_utils.write_cluster_config.
RAY_SKYPILOT_INSTALLATION_COMMANDS = (
'mkdir -p ~/sky_workdir && mkdir -p ~/.sky/sky_app;'
# Disable the pip version check to avoid the warning message, which makes
# the output hard to read.
'export PIP_DISABLE_PIP_VERSION_CHECK=1;'
# Print the PATH in provision.log to help debug PATH issues.
'echo PATH=$PATH; '
# Backward compatibility for ray upgrade (#3248): do not upgrade ray if the
Expand Down
3 changes: 3 additions & 0 deletions sky/templates/jobs-controller.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ file_mounts:

setup: |
{{ sky_activate_python_env }}
# Disable the pip version check to avoid the warning message, which makes the
# output hard to read.
export PIP_DISABLE_PIP_VERSION_CHECK=1

{%- for cmd in cloud_dependencies_installation_commands %}
{{cmd}}
Expand Down
3 changes: 3 additions & 0 deletions sky/templates/sky-serve-controller.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ name: {{service_name}}

setup: |
{{ sky_activate_python_env }}
# Disable the pip version check to avoid the warning message, which makes the
# output hard to read.
export PIP_DISABLE_PIP_VERSION_CHECK=1

# Install all cloud dependencies.
# This is for multicloud support. To allow controller launch on all clouds,
Expand Down
12 changes: 5 additions & 7 deletions sky/utils/command_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,13 +448,11 @@ def run(
proc = subprocess_utils.run(command, shell=False, check=False)
return proc.returncode, '', ''

command_str = self._get_command_to_run(
cmd,
process_stream,
separate_stderr,
# +1 to skip first new line.
skip_lines=skip_lines + 1,
source_bashrc=source_bashrc)
command_str = self._get_command_to_run(cmd,
process_stream,
separate_stderr,
skip_lines=skip_lines,
source_bashrc=source_bashrc)
command = base_ssh_command + [shlex.quote(command_str)]

log_dir = os.path.expanduser(os.path.dirname(log_path))
Expand Down
2 changes: 1 addition & 1 deletion sky/utils/controller_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def _get_cloud_dependencies_installation_commands(
# fluidstack and paperspace
continue
if isinstance(cloud, clouds.AWS):
commands.append(f'echo -n "{prefix_str}AWS{empty_str}" && ' +
commands.append(f'echo -en "\\r{prefix_str}AWS{empty_str}" && ' +
aws_dependencies_installation)
elif isinstance(cloud, clouds.Azure):
commands.append(
Expand Down
3 changes: 3 additions & 0 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -2303,6 +2303,9 @@ def test_managed_jobs(generic_cloud: str):
f'{_JOB_QUEUE_WAIT}| grep {name}-1 | head -n1 | grep "CANCELLING\|CANCELLED"',
'sleep 200',
f'{_JOB_QUEUE_WAIT}| grep {name}-1 | head -n1 | grep CANCELLED',
# Test the functionality for logging.
f's=$(sky jobs logs -n {name}-2 --no-follow); echo "$s"; echo "$s" | grep "start counting"',
f's=$(sky jobs logs --controller -n {name}-2 --no-follow); echo "$s"; echo "$s" | grep "Successfully provisioned cluster:"',
f'{_JOB_QUEUE_WAIT}| grep {name}-2 | head -n1 | grep "RUNNING\|SUCCEEDED"',
],
# TODO(zhwu): Change to _JOB_CANCEL_WAIT.format(job_name=f'{name}-1 -n {name}-2') when
Expand Down

0 comments on commit ef2b408

Please sign in to comment.