Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hardening the cancelation functionality #675

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,16 +667,25 @@ def on_job_cancel(self, job: BatchJob, row):

def _cancel_prolonged_job(self, job: BatchJob, row):
"""Cancel the job if it has been running for too long."""
job_running_start_time = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True)
elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - job_running_start_time
if elapsed > self._cancel_running_job_after:
try:
_log.info(
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})"
)
job.stop()
except OpenEoApiError as e:
_log.error(f"Failed to cancel long-running job {job.job_id}: {e}")
try:
running_start_time_str = row.get("running_start_time")
if not running_start_time_str or pd.isna(running_start_time_str):
_log.warning(f"Job {job.job_id} does not have a valid running start time. Cancellation skipped.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This warning might be a bit too alarming. It will be shown every minute on each job that has no recorded start time, so this could be quite spammy.

"cancellation skipped" might also give wrong impression that job manager still thinks that job should be cancelled for some reason, but it's won't actually do it.

some possible improvements:

  • only show this once per job, or for the whole job tracking run
  • if running start time is missing, fill it in with timestamp of first observation that it's missing, to have a fallback value, so that the auto-cancel feature can still work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the underlying issue then in:

`    if previous_status in {"created", "queued"} and new_status == "running":
                    stats["job started running"] += 1
                    active.loc[i, "running_start_time"] = rfc3339.utcnow()

                if new_status == "canceled":
                    stats["job canceled"] += 1
                    self.on_job_cancel(the_job, active.loc[i])

                if self._cancel_running_job_after and new_status == "running":
                    self._cancel_prolonged_job(the_job, active.loc[i])`

The problem would be removed if I also only run the cancel prolonged job if the previous state was created or queued. Then we know for sure that a starting time has been set?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that won't work in practice: you want cancelling to happen long after the state changed to "running", so both previous state and current state will be "running" when you typically want to cancel.

what you could do is changing the setting of "running_start_time", to something like (pseudo-code):

if running_start_time is not set and new_status == "running":
    active.loc[i, "running_start_time"] = rfc3339.utcnow()

then running_start_time further degrades to a best effort guess of the actual start time, but at least you have something to work with

return

job_running_start_time = rfc3339.parse_datetime(running_start_time_str, with_timezone=True)
elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - job_running_start_time

if elapsed > self._cancel_running_job_after:
try:
_log.info(
f"Cancelling long-running job {job.job_id} (after {elapsed}, running since {job_running_start_time})"
)
job.stop()
except OpenEoApiError as e:
_log.error(f"Failed to cancel long-running job {job.job_id}: {e}")
except Exception as e:
_log.error(f"Unexpected error while handling job {job.job_id}: {e}")

def get_job_dir(self, job_id: str) -> Path:
"""Path to directory where job metadata, results and error logs are be saved."""
Expand Down