diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 3c6fbbd58a5..520ed55d601 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1654,8 +1654,14 @@ def _set_outputs(self, point, taskdef, outputs, flow_nums, flow_wait): # Try to spawn children of the outputs. for msg in good: + if msg == TASK_OUTPUT_EXPIRED: + # not caused by task messages + self._expire_task(itask) + self.spawn_on_output(itask, expired) ## TODO CONTINUE FROM pr 5412 + else: + self.task_events_mgr.process_message(itask, logging.INFO, msg) + # TODO remove this - just log the actual spawning events LOG.info(f"[{itask}] Forced spawning on {msg}") - self.task_events_mgr.process_message(itask, logging.INFO, msg) def _set_prereqs(self, point, taskdef, prereqs, flow_nums, flow_wait): """Set given prerequisites of a target task. @@ -1897,18 +1903,19 @@ def _set_expired_task(self, itask): itask.get_point_as_seconds() + itask.get_offset_as_seconds(itask.tdef.expiration_offset)) if time() > itask.expire_time: - msg = 'Task expired (skipping job).' - LOG.warning(f"[{itask}] {msg}") - self.task_events_mgr.setup_event_handlers(itask, "expired", msg) - # TODO succeeded and expired states are useless due to immediate - # removal under all circumstances (unhandled failed is still used). - if itask.state_reset(TASK_STATUS_EXPIRED, is_held=False): - self.data_store_mgr.delta_task_state(itask) - self.data_store_mgr.delta_task_held(itask) - self.remove(itask, 'expired') + self._expire_task(itask) return True return False + def _expire_task(self, itask): + msg = 'Task expired (skipping job).' + LOG.warning(f"[{itask}] {msg}") + self.task_events_mgr.setup_event_handlers(itask, "expired", msg) + if itask.state_reset(TASK_STATUS_EXPIRED, is_held=False): + self.data_store_mgr.delta_task_state(itask) + self.data_store_mgr.delta_task_held(itask) + self.remove(itask, 'expired') + def task_succeeded(self, id_): """Return True if task with id_ is in the succeeded state.""" return any(