Skip to content

Commit

Permalink
cylc remove: more efficiently unset prereq satisfaction of downstre…
Browse files Browse the repository at this point in the history
…am tasks
  • Loading branch information
MetRonnie committed Oct 11, 2024
1 parent 8beab10 commit 5c362a0
Showing 1 changed file with 36 additions and 24 deletions.
60 changes: 36 additions & 24 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from collections import Counter
from contextlib import suppress
import itertools
import json
import logging
from textwrap import indent
Expand Down Expand Up @@ -90,6 +91,7 @@
TASK_STATUSES_FINAL,
)
from cylc.flow.task_trigger import TaskTrigger
from cylc.flow.taskdef import generate_graph_children
from cylc.flow.util import deserialise_set
from cylc.flow.workflow_status import StopMode

Expand Down Expand Up @@ -917,13 +919,19 @@ def _get_task_by_id(self, id_: str) -> Optional[TaskProxy]:
return None

def queue_task(self, itask: TaskProxy) -> None:
"""Queue a task that is ready to run."""
"""Queue a task that is ready to run.
If it is already queued, do nothing.
"""
if itask.state_reset(is_queued=True):
self.data_store_mgr.delta_task_queued(itask)
self.task_queue_mgr.push_task(itask)

def unqueue_task(self, itask: TaskProxy) -> None:
"""Un-queue a task that is no longer ready to run."""
"""Un-queue a task that is no longer ready to run.
If it is not queued, do nothing.
"""
if itask.state_reset(is_queued=False):
self.data_store_mgr.delta_task_queued(itask)
self.task_queue_mgr.remove_task(itask)
Expand Down Expand Up @@ -2103,30 +2111,34 @@ def remove_tasks(
*(quick_relative_id(cycle, task) for task, cycle in inactive),
}

# Unset any prereqs naturally satisfied by these tasks
# (do not unset those satisfied by `cylc set --pre`):
for itask in self.get_tasks():
fnums_to_remove = itask.match_flows(flow_nums)
if not fnums_to_remove:
continue
for prereq in (
*itask.state.prerequisites,
*itask.state.suicide_prerequisites,
):
for msg in prereq.naturally_satisfied_dependencies():
id_ = msg.get_id()
if id_ in matched_task_ids:
prereq[msg] = False
if id_ not in removed:
removed[id_] = fnums_to_remove
if not itask.prereqs_are_satisfied():
self.unqueue_task(itask)

# Remove from DB tables:
for id_ in matched_task_ids:
point, name = id_.split('/', 1)
point_str, name = id_.split('/', 1)
tdef = self.config.taskdefs[name]
# Unset any prereqs naturally satisfied by these tasks
# (do not unset those satisfied by `cylc set --pre`):
for child in itertools.chain.from_iterable(
generate_graph_children(tdef, get_point(point_str)).values()
):
child_itask = self.get_task(child.point, child.name)
if not child_itask:
continue
fnums_to_remove = child_itask.match_flows(flow_nums)
if not fnums_to_remove:
continue

Check warning on line 2127 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L2127

Added line #L2127 was not covered by tests
for prereq in (
*child_itask.state.prerequisites,
*child_itask.state.suicide_prerequisites,
):
for msg in prereq.naturally_satisfied_dependencies():
if msg.get_id() == id_:
prereq[msg] = False
self.unqueue_task(child_itask)
if id_ not in removed:
removed[id_] = fnums_to_remove

# Remove from DB tables:
db_removed_fnums = self.workflow_db_mgr.remove_task_from_flows(
point, name, flow_nums
point_str, name, flow_nums
)
if db_removed_fnums:
removed.setdefault(id_, set()).update(db_removed_fnums)
Expand Down

0 comments on commit 5c362a0

Please sign in to comment.