Skip to content

Commit

Permalink
De-queue tasks on forced state change.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Mar 12, 2024
1 parent 4a336b9 commit 05dc4d5
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 17 deletions.
2 changes: 1 addition & 1 deletion cylc/flow/scripts/dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
For command line monitoring:
* `cylc tui`
* `watch cylc dump WORKFLOW_ID` works for small simple workflows
* `watch cylc dump -t WORKFLOW_ID` works for small simple workflows
Examples:
# Display the state of all active tasks, sorted by cycle point:
Expand Down
15 changes: 10 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1819,19 +1819,24 @@ def _set_outputs_itask(
outputs = self._standardise_outputs(
itask.point, itask.tdef, outputs)

changed = False
outputs = sorted(outputs, key=itask.state.outputs.output_sort_key)
for output in outputs:
if itask.state.outputs.is_completed(output):
LOG.info(f"output {itask.identity}:{output} completed already")
continue
self.task_events_mgr.process_message(
itask, logging.INFO, output, forced=True)
changed = True

if changed and itask.transient:
self.workflow_db_mgr.put_update_task_state(itask)
self.workflow_db_mgr.put_update_task_outputs(itask)
if not itask.state(TASK_STATUS_WAITING):
# Can't be runahead limited or queued.
itask.state_reset(is_runahead=False, is_queued=False)
self.task_queue_mgr.remove_task(itask)
self.data_store_mgr.delta_task_queued(itask)

self.data_store_mgr.delta_task_state(itask)
self.data_store_mgr.delta_task_outputs(itask)
self.workflow_db_mgr.put_update_task_state(itask)
self.workflow_db_mgr.put_update_task_outputs(itask)

def _set_prereqs_itask(
self,
Expand Down
16 changes: 5 additions & 11 deletions cylc/flow/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,10 @@ def reset(
"""
req = status

if forced and req in [TASK_STATUS_SUBMITTED, TASK_STATUS_RUNNING]:
# Forcing can only change completion status (there's no job).
return False

current_status = (
self.status,
self.is_held,
Expand All @@ -432,15 +436,7 @@ def reset(
# no change - do nothing
return False

if (
forced and
req in [TASK_STATUS_SUBMITTED, TASK_STATUS_RUNNING]
):
# Forced setting of outputs can cause state change to completed
# but not to submitted or running (there's no real job).
return False

# perform the actual state change
# perform the state change
self.status, self.is_held, self.is_queued, self.is_runahead = (
requested_status
)
Expand All @@ -449,8 +445,6 @@ def reset(
self.is_updated = True
self.kill_failed = False

# Set standard outputs in accordance with task state.

if status is None:
# NOTE: status is None if the task is being released
status = self.status
Expand Down

0 comments on commit 05dc4d5

Please sign in to comment.