diff --git a/cylc/flow/task_outputs.py b/cylc/flow/task_outputs.py index fececd23f99..4523562a4c0 100644 --- a/cylc/flow/task_outputs.py +++ b/cylc/flow/task_outputs.py @@ -244,6 +244,21 @@ def get_optional_outputs( } +# a completion expression that considers the outputs complete if any final task +# output is received +FINAL_OUTPUT_COMPLETION = ' or '.join( + map( + trigger_to_completion_variable, + [ + TASK_OUTPUT_SUCCEEDED, + TASK_OUTPUT_FAILED, + TASK_OUTPUT_SUBMIT_FAILED, + TASK_OUTPUT_EXPIRED, + ], + ) +) + + class TaskOutputs: """Represents a collection of outputs for a task. @@ -384,8 +399,14 @@ def __iter__(self) -> Iterator[Tuple[str, str, bool]]: def is_complete(self) -> bool: """Return True if the outputs are complete.""" + # NOTE: If a task has been removed from the workflow via restart / + # reload, then it is possible for the completion expression to be blank + # (empty string). In this case, we consider the task outputs to be + # complete when any final output has been generated. + # See https://github.com/cylc/cylc-flow/pull/5067 + expr = self._completion_expression or FINAL_OUTPUT_COMPLETION return CompletionEvaluator( - self._completion_expression, + expr, **{ self._message_to_compvar[message]: completed for message, completed in self._completed.items() diff --git a/tests/integration/test_optional_outputs.py b/tests/integration/test_optional_outputs.py index afab3d9e8fa..d5c4e41ce81 100644 --- a/tests/integration/test_optional_outputs.py +++ b/tests/integration/test_optional_outputs.py @@ -29,26 +29,28 @@ from cylc.flow.cycling.integer import IntegerPoint from cylc.flow.cycling.iso8601 import ISO8601Point from cylc.flow.network.resolvers import TaskMsg -from cylc.flow.scheduler import Scheduler from cylc.flow.task_events_mgr import ( TaskEventsManager, ) from cylc.flow.task_outputs import ( TASK_OUTPUTS, TASK_OUTPUT_EXPIRED, + TASK_OUTPUT_FAILED, TASK_OUTPUT_FINISHED, TASK_OUTPUT_SUCCEEDED, get_completion_expression, ) from cylc.flow.task_state import ( + TASK_STATUSES_ACTIVE, TASK_STATUS_EXPIRED, TASK_STATUS_PREPARING, + TASK_STATUS_RUNNING, TASK_STATUS_WAITING, - TASK_STATUSES_ACTIVE, ) if TYPE_CHECKING: from cylc.flow.task_proxy import TaskProxy + from cylc.flow.scheduler import Scheduler def reset_outputs(itask: 'TaskProxy'): @@ -441,3 +443,68 @@ async def test_clock_expiry( # the third task should *not* be expired (it was a manual submit) assert not three.state(TASK_STATUS_EXPIRED) assert not three.state.outputs.is_message_complete(TASK_OUTPUT_EXPIRED) + + +async def test_removed_taskdef( + flow, + scheduler, + start, +): + """It should handle tasks being removed from the config. + + If the config of an active task is removed from the config by restart / + reload, then we must provide a fallback completion expression, otherwise + the expression will be blank (task has no required or optional outputs). + + The fallback is to consider the outputs complete if *any* final output is + received. Since the task has been removed from the workflow its outputs + should be inconsequential. + + See: https://github.com/cylc/cylc-flow/issues/5057 + """ + id_ = flow({ + 'scheduling': { + 'graph': { + 'R1': 'a & z' + } + } + }) + + # start the workflow and mark the tasks as running + schd: 'Scheduler' = scheduler(id_) + async with start(schd): + for itask in schd.pool.get_tasks(): + itask.state_reset(TASK_STATUS_RUNNING) + assert itask.state.outputs._completion_expression == 'succeeded' + + # remove the task "z" from the config + id_ = flow({ + 'scheduling': { + 'graph': { + 'R1': 'a' + } + } + }, id_=id_) + + # restart the workflow + schd: 'Scheduler' = scheduler(id_) + async with start(schd): + # 1/a: + # * is still in the config + # * is should still have a sensible completion expression + # * its outputs should be incomplete if the task fails + a_1 = schd.pool.get_task(IntegerPoint('1'), 'a') + assert a_1 + assert a_1.state.outputs._completion_expression == 'succeeded' + a_1.state.outputs.set_message_complete(TASK_OUTPUT_FAILED) + assert not a_1.is_complete() + + # 1/z: + # * is no longer in the config + # * should have a blank completion expression + # * its outputs should be completed by any final output + z_1 = schd.pool.get_task(IntegerPoint('1'), 'z') + assert z_1 + assert z_1.state.outputs._completion_expression == '' + z_1.state.outputs.set_message_complete(TASK_OUTPUT_FAILED) + assert z_1.is_complete()