diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 7449e7d274a..516385f7fc3 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -920,6 +920,7 @@ async def process_command_queue(self) -> None: name, args, kwargs = command except Empty: break + args_string = ', '.join(str(a) for a in args) kwargs_string = ', '.join( f"{key}={value}" for key, value in kwargs.items() diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 20ee7379d27..84b805501b9 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -101,6 +101,7 @@ from cylc.flow.task_state import ( TASK_STATUS_PREPARING, TASK_STATUS_SUBMITTED, + TASK_STATUS_SUBMIT_FAILED, TASK_STATUS_RUNNING, TASK_STATUS_WAITING, TASK_STATUSES_ACTIVE @@ -187,6 +188,9 @@ def kill_task_jobs(self, workflow, itasks): itask.state_reset(is_held=True) self.data_store_mgr.delta_task_held(itask) to_kill_tasks.append(itask) + elif itask.state(TASK_STATUS_PREPARING): + itask.killed_in_job_prep = True + LOG.warning(f"[{itask}] killed in prep") else: LOG.warning(f"[{itask}] not killable") self._run_job_cmd( @@ -483,6 +487,16 @@ def submit_task_jobs(self, workflow, itasks, curve_auth, ) continue + if itask.killed_in_job_prep: + itask.waiting_on_job_prep = False + itask.killed_in_job_prep = False + itask.state_reset(TASK_STATUS_SUBMIT_FAILED) + self.data_store_mgr.delta_task_state(itask) + itask.local_job_file_path = None # reset for retry + self._prep_submit_task_job_error( + workflow, itask, '(killed in job prep)', '') + continue + # Build the "cylc jobs-submit" command cmd = [self.JOBS_SUBMIT] if LOG.isEnabledFor(DEBUG): @@ -1087,7 +1101,7 @@ def _prep_submit_task_job( Returns: * itask - preparation complete. * None - preparation in progress. - * False - perparation failed. + * False - preparation failed. """ if itask.local_job_file_path: diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 92905becf1e..2cfe9eae31c 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -762,13 +762,18 @@ def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: if ntask is not None: self.add_to_pool(ntask) - def remove(self, itask, reason=""): + def remove(self, itask, reason="", manual=False): """Remove a task from the pool (e.g. after a reload).""" self.tasks_removed = True msg = "task proxy removed" if reason: msg += f" ({reason})" + if manual: + log = LOG.info + else: + log = LOG.debug + try: del self.hidden_pool[itask.point][itask.identity] except KeyError: @@ -778,7 +783,7 @@ def remove(self, itask, reason=""): self.hidden_pool_changed = True if not self.hidden_pool[itask.point]: del self.hidden_pool[itask.point] - LOG.debug(f"[{itask}] {msg}") + log(f"[{itask}] {msg}") self.task_queue_mgr.remove_task(itask) return @@ -800,7 +805,7 @@ def remove(self, itask, reason=""): # Event-driven final update of task_states table. # TODO: same for datastore (still updated by scheduler loop) self.workflow_db_mgr.put_update_task_state(itask) - LOG.debug(f"[{itask}] {msg}") + log(f"[{itask}] {msg}") del itask def get_all_tasks(self) -> List[TaskProxy]: @@ -1663,7 +1668,7 @@ def remove_tasks(self, items): """Remove tasks from the pool.""" itasks, _, bad_items = self.filter_task_proxies(items) for itask in itasks: - self.remove(itask, 'request') + self.remove(itask, 'request', manual=True) if self.compute_runahead(): self.release_runahead_tasks() return len(bad_items) diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 271049126fd..ae0fb71347a 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -134,11 +134,13 @@ class TaskProxy: graph children: {msg: [(name, point), ...]} .flow_nums: flows I belong to - flow_wait: + .flow_wait: wait for flow merge before spawning children .waiting_on_job_prep: True whilst task is awaiting job prep, reset to False once the preparation has completed. + .killed_in_job_prep: + killed during job preparation; set to submit-failed once prepped Args: tdef: The definition object of this task. @@ -180,6 +182,7 @@ class TaskProxy: 'tokens', 'try_timers', 'waiting_on_job_prep', + 'killed_in_job_prep' ] def __init__( @@ -252,6 +255,7 @@ def __init__( self.late_time: Optional[float] = None self.is_late = is_late self.waiting_on_job_prep = False + self.killed_in_job_prep = False self.state = TaskState(tdef, self.point, status, is_held)