diff --git a/cylc/flow/etc/cylc b/cylc/flow/etc/cylc index 48a91c0603a..facf27e4291 100755 --- a/cylc/flow/etc/cylc +++ b/cylc/flow/etc/cylc @@ -2,7 +2,7 @@ # THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. # Copyright (C) NIWA & British Crown (Met Office) & Contributors. -# +# # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 3c523760724..dbe68b47d59 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -811,7 +811,9 @@ def force_spawn_children( self, tasks: Iterable[str], outputs: Optional[Iterable[str]] = None, - flow_num: Optional[int] = None + flow: Iterable[str] = None, + flow_wait: bool = False, + flow_descr: str = "" ) -> Tuple[bool, str]: """Spawn children of given task outputs. @@ -820,7 +822,8 @@ def force_spawn_children( Args: tasks: List of identifiers or task globs. outputs: List of outputs to spawn on. - flow_num: Flow number to attribute the outputs. + flow (list): + Flow ownership of triggered tasks. """ self.schd.command_queue.put( ( @@ -828,7 +831,9 @@ def force_spawn_children( (tasks,), { "outputs": outputs, - "flow_num": flow_num + "flow": flow, + "flow_wait": flow_wait, + "flow_descr": flow_descr, }, ) ) diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 21ec296a4f5..6e16802d718 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -2091,25 +2091,33 @@ class Meta: class SetTask(Mutation, TaskMutation): class Meta: - description = sstrip(''' - Intervene in task prerequisite, output, and completion status. + description = sstrip(""" + Set task prerequisites or outputs. - Setting a prerequisite contributes to the task's readiness to run. + By default, set all required outputs for target task(s). - Setting an output contributes to the task's completion, and sets - the corresponding prerequisites of child tasks. + Setting prerequisites contributes to the task's readiness to run. - Valid for: paused, running workflows. - ''') + Setting outputs contributes to the task's completion, and sets the + corresponding prerequisites of child tasks. + + Setting an output also sets any implied outputs: + - started implies submitted + - succeeded and failed imply started + - custom outputs and expired do not imply any other outputs + """) resolver = partial(mutator, command='force_spawn_children') - class Arguments(TaskMutation.Arguments): + class Arguments(TaskMutation.Arguments, FlowMutationArguments): outputs = graphene.List( String, default_value=[TASK_OUTPUT_SUCCEEDED], - description='List of task outputs to satisfy.' + description='List of task outputs to set complete.' + ) + prerequisites = graphene.List( + String, + description='List of task prerequisites to set satisfied.' ) - flow_num = Int() class Trigger(Mutation, TaskMutation): diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 210e1c6e93f..cd530f3c1d5 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -2124,13 +2124,16 @@ def command_force_trigger_tasks(self, items, flow, flow_wait, flow_descr): return self.pool.force_trigger_tasks( items, flow, flow_wait, flow_descr) - def command_force_spawn_children(self, items, outputs, flow_num): + def command_force_spawn_children( + self, items, outputs, flow, flow_wait, flow_descr + ): """Force spawn task successors. User-facing method name: set_task. - """ - return self.pool.force_spawn_children(items, outputs, flow_num) + return self.pool.force_spawn_children( + items, outputs, flow, flow_wait, flow_descr + ) def _update_profile_info(self, category, amount, amount_format="%s"): """Update the 1, 5, 15 minute dt averages for a given category.""" diff --git a/cylc/flow/scripts/set_task.py b/cylc/flow/scripts/set_task.py index 87636e95333..f3bf47e3ecd 100755 --- a/cylc/flow/scripts/set_task.py +++ b/cylc/flow/scripts/set_task.py @@ -90,7 +90,7 @@ def get_option_parser() -> COP: ) parser.add_option( - "-o", "--out", metavar="OUTPUT(s)", + "-o", "--out", "--output", metavar="OUTPUT(s)", help=( "Set task outputs completed, along with any implied outputs." ' May be "all", to set all required outputs complete.' @@ -100,7 +100,7 @@ def get_option_parser() -> COP: ) parser.add_option( - "-p", "--pre", metavar="PREREQUISITE(s)", + "-p", "--pre", "--prerequisite", metavar="PREREQUISITE(s)", help=( "Set task prerequisites satisfied." ' May be "all", which is equivalent to "cylc trigger".' @@ -116,7 +116,7 @@ def get_option_parser() -> COP: " If the task is already spawned, use current flows and" " merge new flow numbers if specified." ), - action="store", default=None, dest="flow_num" + action="store", default=None, dest="flow" ) parser.add_option( @@ -134,26 +134,6 @@ def get_option_parser() -> COP: return parser -def _validate(options): - """Check validity of flow-related options.""" - for val in options.flow: - val = val.strip() - if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]: - if len(options.flow) != 1: - raise InputError(ERR_OPT_FLOW_INT) - else: - try: - int(val) - except ValueError: - raise InputError(ERR_OPT_FLOW_VAL.format(val)) - - if options.flow_descr and options.flow != [FLOW_NEW]: - raise InputError(ERR_OPT_FLOW_META) - - if options.flow_wait and options.flow[0] in [FLOW_NEW, FLOW_NONE]: - raise InputError(ERR_OPT_FLOW_WAIT) - - async def run(options: 'Values', workflow_id: str, *tokens_list) -> None: pclient = get_client(workflow_id, timeout=options.comms_timeout) @@ -167,7 +147,9 @@ async def run(options: 'Values', workflow_id: str, *tokens_list) -> None: ], 'outputs': options.outputs, 'prerequisites': options.prerequisites, - 'flowNum': options.flow_num + 'flow': options.flow, + 'flowWait': options.flow_wait, + 'flowDescr': options.flow_descr } } diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index d991aaf74e6..969afa69a06 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -136,7 +136,6 @@ async def run(options: 'Values', workflow_id: str, *tokens_list): 'flowDescr': options.flow_descr, } } - await pclient.async_request('graphql', mutation_kwargs) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index dbd9bba2749..7e828c77502 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1579,8 +1579,10 @@ def spawn_task( def force_spawn_children( self, items: Iterable[str], - outputs: Optional[List[str]] = None, - flow_num: Optional[int] = None + outputs: List[str], + flow: List[str], + flow_wait: bool = False, + flow_descr: str = "", ): """Spawn downstream children of given outputs, on user command. @@ -1591,14 +1593,13 @@ def force_spawn_children( items: Identifiers for matching task definitions, each with the form "point/name". outputs: List of outputs to spawn on - flow_num: Flow number to attribute the outputs + flow: Flow number to attribute the outputs """ outputs = outputs or [TASK_OUTPUT_SUCCEEDED] - if flow_num is None: - flow_nums = None - else: - flow_nums = {flow_num} + flow_nums = self._flow_cmd_helper(flow) + if flow_nums is None: + return n_warnings, task_items = self.match_taskdefs(items) for (_, point), taskdef in sorted(task_items.items()): @@ -1638,27 +1639,15 @@ def remove_tasks(self, items): self.release_runahead_tasks() return len(bad_items) - def force_trigger_tasks( - self, items: Iterable[str], - flow: List[str], - flow_wait: bool = False, - flow_descr: Optional[str] = None - ) -> int: - """Manual task triggering. - - Don't get a new flow number for existing n=0 tasks (e.g. incomplete - tasks). These can carry on in the original flow if retriggered. - - Queue the task if not queued, otherwise release it to run. - - """ + def _flow_cmd_helper(self, flow): + # TODO type hints if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}): if len(flow) != 1: LOG.warning( f'The "flow" values {FLOW_ALL}, {FLOW_NEW} & {FLOW_NONE}' ' cannot be used in combination with integer flow numbers.' ) - return 0 + return None if flow[0] == FLOW_ALL: flow_nums = self._get_active_flow_nums() elif flow[0] == FLOW_NEW: @@ -1670,9 +1659,28 @@ def force_trigger_tasks( flow_nums = {int(n) for n in flow} except ValueError: LOG.warning( - f"Trigger ignored, illegal flow values {flow}" + f"Ignoring command: illegal flow values {flow}" ) - return 0 + return None + return flow_nums + + def force_trigger_tasks( + self, items: Iterable[str], + flow: List[str], + flow_wait: bool = False, + flow_descr: Optional[str] = None + ) -> int: + """Manual task triggering. + + Don't get a new flow number for existing n=0 tasks (e.g. incomplete + tasks). These can carry on in the original flow if retriggered. + + Queue the task if not queued, otherwise release it to run. + + """ + flow_nums = self._flow_cmd_helper(flow) + if flow_nums is None: + return # n_warnings, task_items = self.match_taskdefs(items) itasks, future_tasks, unmatched = self.filter_task_proxies( diff --git a/tests/functional/events/23-workflow-stalled-handler/flow.cylc b/tests/functional/events/23-workflow-stalled-handler/flow.cylc index 2ef7647d40d..c03ea783078 100644 --- a/tests/functional/events/23-workflow-stalled-handler/flow.cylc +++ b/tests/functional/events/23-workflow-stalled-handler/flow.cylc @@ -1,6 +1,6 @@ [scheduler] [[events]] - stall handlers = "cylc set-task --flow=1 %(workflow)s//1/bar" + stall handlers = "cylc set-task %(workflow)s//1/bar" stall timeout = PT0S abort on stall timeout = False expected task failures = 1/bar diff --git a/tests/functional/logging/02-duplicates/flow.cylc b/tests/functional/logging/02-duplicates/flow.cylc index d0ee44d22f2..3432090babc 100644 --- a/tests/functional/logging/02-duplicates/flow.cylc +++ b/tests/functional/logging/02-duplicates/flow.cylc @@ -22,7 +22,7 @@ script = false [[bar]] script = """ -cylc set-task --flow=1 "${CYLC_WORKFLOW_ID}" "foo.${CYLC_TASK_CYCLE_POINT}" +cylc set-task "${CYLC_WORKFLOW_ID}" "foo.${CYLC_TASK_CYCLE_POINT}" """ [[restart]] script = """ diff --git a/tests/functional/reload/17-graphing-change.t b/tests/functional/reload/17-graphing-change.t index e99de3a35ec..f343737c644 100755 --- a/tests/functional/reload/17-graphing-change.t +++ b/tests/functional/reload/17-graphing-change.t @@ -66,8 +66,8 @@ cp "${TEST_SOURCE_DIR}/graphing-change/flow-2.cylc" \ "${RUN_DIR}/${WORKFLOW_NAME}/flow.cylc" # Spawn a couple of task proxies, to get "task definition removed" message. -cylc set-task --flow=1 "${WORKFLOW_NAME}//1/foo" -cylc set-task --flow=1 "${WORKFLOW_NAME}//1/baz" +cylc set-task "${WORKFLOW_NAME}//1/foo" +cylc set-task "${WORKFLOW_NAME}//1/baz" # reload workflow run_ok "${TEST_NAME_BASE}-swap-reload" cylc reload "${WORKFLOW_NAME}" poll grep_workflow_log_n_times 'Reload completed' 3 diff --git a/tests/functional/runahead/default-future/flow.cylc b/tests/functional/runahead/default-future/flow.cylc index 7e384dd8e31..1535755aa3e 100644 --- a/tests/functional/runahead/default-future/flow.cylc +++ b/tests/functional/runahead/default-future/flow.cylc @@ -27,7 +27,7 @@ [[spawner]] script = """ # spawn wibble - cylc set-task --flow=1 $CYLC_WORKFLOW_ID 20100101T0800Z/foo + cylc set-task $CYLC_WORKFLOW_ID 20100101T0800Z/foo """ [[foo]] script = false diff --git a/tests/functional/spawn-on-demand/12-set-outputs-no-reflow.t b/tests/functional/spawn-on-demand/12-set-outputs-cont-flow.t similarity index 93% rename from tests/functional/spawn-on-demand/12-set-outputs-no-reflow.t rename to tests/functional/spawn-on-demand/12-set-outputs-cont-flow.t index 4b17c515bde..52a74c9dedc 100644 --- a/tests/functional/spawn-on-demand/12-set-outputs-no-reflow.t +++ b/tests/functional/spawn-on-demand/12-set-outputs-cont-flow.t @@ -16,7 +16,7 @@ # along with this program. If not, see . #------------------------------------------------------------------------------- -# Check that "cylc set-task" works like it says on the tin. +# Check that "cylc set-task" continues a flow by default. . "$(dirname "$0")/test_header" set_test_number 2 reftest diff --git a/tests/functional/spawn-on-demand/12-set-outputs-no-reflow/flow.cylc b/tests/functional/spawn-on-demand/12-set-outputs-cont-flow/flow.cylc similarity index 80% rename from tests/functional/spawn-on-demand/12-set-outputs-no-reflow/flow.cylc rename to tests/functional/spawn-on-demand/12-set-outputs-cont-flow/flow.cylc index 21098892dbf..5e359c2eaf2 100644 --- a/tests/functional/spawn-on-demand/12-set-outputs-no-reflow/flow.cylc +++ b/tests/functional/spawn-on-demand/12-set-outputs-cont-flow/flow.cylc @@ -1,5 +1,5 @@ -# Test that `cylc set-task` does not cause reflow by default -# Task setter should cause bar to run, but not subsequently baz. +# Test that `cylc set-task` continues the active flow by default +# Task setter should cause bar to run, then subsequently baz. [scheduler] [[events]] diff --git a/tests/functional/spawn-on-demand/12-set-outputs-no-reflow/reference.log b/tests/functional/spawn-on-demand/12-set-outputs-cont-flow/reference.log similarity index 79% rename from tests/functional/spawn-on-demand/12-set-outputs-no-reflow/reference.log rename to tests/functional/spawn-on-demand/12-set-outputs-cont-flow/reference.log index 2322cc234da..3c7b498cc8b 100644 --- a/tests/functional/spawn-on-demand/12-set-outputs-no-reflow/reference.log +++ b/tests/functional/spawn-on-demand/12-set-outputs-cont-flow/reference.log @@ -3,3 +3,4 @@ Final point: 1 1/foo -triggered off [] 1/setter -triggered off ['1/foo'] 1/bar -triggered off ['1/foo'] +1/baz -triggered off ['1/bar']