diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 8e8b639914..d46e7cd4ad 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -18,6 +18,7 @@ from collections import Counter from contextlib import suppress +import itertools import json import logging from textwrap import indent @@ -90,6 +91,7 @@ TASK_STATUSES_FINAL, ) from cylc.flow.task_trigger import TaskTrigger +from cylc.flow.taskdef import generate_graph_children from cylc.flow.util import deserialise_set from cylc.flow.workflow_status import StopMode @@ -917,13 +919,19 @@ def _get_task_by_id(self, id_: str) -> Optional[TaskProxy]: return None def queue_task(self, itask: TaskProxy) -> None: - """Queue a task that is ready to run.""" + """Queue a task that is ready to run. + + If it is already queued, do nothing. + """ if itask.state_reset(is_queued=True): self.data_store_mgr.delta_task_queued(itask) self.task_queue_mgr.push_task(itask) def unqueue_task(self, itask: TaskProxy) -> None: - """Un-queue a task that is no longer ready to run.""" + """Un-queue a task that is no longer ready to run. + + If it is not queued, do nothing. + """ if itask.state_reset(is_queued=False): self.data_store_mgr.delta_task_queued(itask) self.task_queue_mgr.remove_task(itask) @@ -2103,30 +2111,34 @@ def remove_tasks( *(quick_relative_id(cycle, task) for task, cycle in inactive), } - # Unset any prereqs naturally satisfied by these tasks - # (do not unset those satisfied by `cylc set --pre`): - for itask in self.get_tasks(): - fnums_to_remove = itask.match_flows(flow_nums) - if not fnums_to_remove: - continue - for prereq in ( - *itask.state.prerequisites, - *itask.state.suicide_prerequisites, - ): - for msg in prereq.naturally_satisfied_dependencies(): - id_ = msg.get_id() - if id_ in matched_task_ids: - prereq[msg] = False - if id_ not in removed: - removed[id_] = fnums_to_remove - if not itask.prereqs_are_satisfied(): - self.unqueue_task(itask) - - # Remove from DB tables: for id_ in matched_task_ids: - point, name = id_.split('/', 1) + point_str, name = id_.split('/', 1) + tdef = self.config.taskdefs[name] + # Unset any prereqs naturally satisfied by these tasks + # (do not unset those satisfied by `cylc set --pre`): + for child in itertools.chain.from_iterable( + generate_graph_children(tdef, get_point(point_str)).values() + ): + child_itask = self.get_task(child.point, child.name) + if not child_itask: + continue + fnums_to_remove = child_itask.match_flows(flow_nums) + if not fnums_to_remove: + continue + for prereq in ( + *child_itask.state.prerequisites, + *child_itask.state.suicide_prerequisites, + ): + for msg in prereq.naturally_satisfied_dependencies(): + if msg.get_id() == id_: + prereq[msg] = False + self.unqueue_task(child_itask) + if id_ not in removed: + removed[id_] = fnums_to_remove + + # Remove from DB tables: db_removed_fnums = self.workflow_db_mgr.remove_task_from_flows( - point, name, flow_nums + point_str, name, flow_nums ) if db_removed_fnums: removed.setdefault(id_, set()).update(db_removed_fnums)