Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Jobs] Move task retry logic to correct branch in stream_logs_by_id #4407

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
45 changes: 23 additions & 22 deletions sky/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,29 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str:
job_statuses = backend.get_job_status(handle, stream_logs=False)
job_status = list(job_statuses.values())[0]
assert job_status is not None, 'No job found.'
assert task_id is not None, job_id
if job_status == job_lib.JobStatus.FAILED:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be a few different statuses that can trigger the retry, including FAILED_SETUP and FAILED. Should we add both?

Copy link
Contributor Author

@andylizf andylizf Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree we should add both FAILED and FAILED_SETUP as retry triggers since setup failure is also a user program failure.

Also, The comment mentions FAILED_SETUP needs to be after FAILED - does this ordering affect our retry logic implementation?

    # The job setup failed (only in effect if --detach-setup is set). It
    # needs to be placed after the `FAILED` state, so that the status
    # set by our generated ray program will not be overwritten by
    # ray's job status (FAILED).
    # This is for a better UX, so that the user can find out the reason
    # of the failure quickly.
    FAILED_SETUP = 'FAILED_SETUP'

task_specs = managed_job_state.get_task_specs(
job_id, task_id)
if task_specs.get('max_restarts_on_errors', 0) == 0:
# We don't need to wait for the managed job status
# update, as the job is guaranteed to be in terminal
# state afterwards.
break
print()
status_display.update(
ux_utils.spinner_message(
'Waiting for next restart for the failed task'))
status_display.start()
while True:
_, managed_job_status = (
managed_job_state.get_latest_task_id_status(job_id))
if (managed_job_status !=
managed_job_state.ManagedJobStatus.RUNNING):
andylizf marked this conversation as resolved.
Show resolved Hide resolved
break
time.sleep(JOB_STATUS_CHECK_GAP_SECONDS)
continue
if job_status != job_lib.JobStatus.CANCELLED:
assert task_id is not None, job_id
if task_id < num_tasks - 1 and follow:
# The log for the current job is finished. We need to
# wait until next job to be started.
Expand All @@ -410,27 +431,7 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str:
time.sleep(JOB_STATUS_CHECK_GAP_SECONDS)
continue
else:
task_specs = managed_job_state.get_task_specs(
job_id, task_id)
if task_specs.get('max_restarts_on_errors', 0) == 0:
# We don't need to wait for the managed job status
# update, as the job is guaranteed to be in terminal
# state afterwards.
break
print()
status_display.update(
ux_utils.spinner_message(
'Waiting for next restart for the failed task'))
status_display.start()
while True:
_, managed_job_status = (
managed_job_state.get_latest_task_id_status(
job_id))
if (managed_job_status !=
managed_job_state.ManagedJobStatus.RUNNING):
break
time.sleep(JOB_STATUS_CHECK_GAP_SECONDS)
continue
break
# The job can be cancelled by the user or the controller (when
# the cluster is partially preempted).
logger.debug(
Expand Down
Loading