Skip to content

Commit

Permalink
Replicated original set-outputs behaviour. [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Jul 31, 2023
1 parent b6053f7 commit 569797d
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 74 deletions.
2 changes: 1 addition & 1 deletion cylc/flow/etc/cylc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
Expand Down
11 changes: 8 additions & 3 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,9 @@ def force_spawn_children(
self,
tasks: Iterable[str],
outputs: Optional[Iterable[str]] = None,
flow_num: Optional[int] = None
flow: Iterable[str] = None,
flow_wait: bool = False,
flow_descr: str = ""
) -> Tuple[bool, str]:
"""Spawn children of given task outputs.
Expand All @@ -820,15 +822,18 @@ def force_spawn_children(
Args:
tasks: List of identifiers or task globs.
outputs: List of outputs to spawn on.
flow_num: Flow number to attribute the outputs.
flow (list):
Flow ownership of triggered tasks.
"""
self.schd.command_queue.put(
(
"force_spawn_children",
(tasks,),
{
"outputs": outputs,
"flow_num": flow_num
"flow": flow,
"flow_wait": flow_wait,
"flow_descr": flow_descr,
},
)
)
Expand Down
28 changes: 18 additions & 10 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2091,25 +2091,33 @@ class Meta:

class SetTask(Mutation, TaskMutation):
class Meta:
description = sstrip('''
Intervene in task prerequisite, output, and completion status.
description = sstrip("""
Set task prerequisites or outputs.
Setting a prerequisite contributes to the task's readiness to run.
By default, set all required outputs for target task(s).
Setting an output contributes to the task's completion, and sets
the corresponding prerequisites of child tasks.
Setting prerequisites contributes to the task's readiness to run.
Valid for: paused, running workflows.
''')
Setting outputs contributes to the task's completion, and sets the
corresponding prerequisites of child tasks.
Setting an output also sets any implied outputs:
- started implies submitted
- succeeded and failed imply started
- custom outputs and expired do not imply any other outputs
""")
resolver = partial(mutator, command='force_spawn_children')

class Arguments(TaskMutation.Arguments):
class Arguments(TaskMutation.Arguments, FlowMutationArguments):
outputs = graphene.List(
String,
default_value=[TASK_OUTPUT_SUCCEEDED],
description='List of task outputs to satisfy.'
description='List of task outputs to set complete.'
)
prerequisites = graphene.List(
String,
description='List of task prerequisites to set satisfied.'
)
flow_num = Int()


class Trigger(Mutation, TaskMutation):
Expand Down
9 changes: 6 additions & 3 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2124,13 +2124,16 @@ def command_force_trigger_tasks(self, items, flow, flow_wait, flow_descr):
return self.pool.force_trigger_tasks(
items, flow, flow_wait, flow_descr)

def command_force_spawn_children(self, items, outputs, flow_num):
def command_force_spawn_children(
self, items, outputs, flow, flow_wait, flow_descr
):
"""Force spawn task successors.
User-facing method name: set_task.
"""
return self.pool.force_spawn_children(items, outputs, flow_num)
return self.pool.force_spawn_children(
items, outputs, flow, flow_wait, flow_descr
)

def _update_profile_info(self, category, amount, amount_format="%s"):
"""Update the 1, 5, 15 minute dt averages for a given category."""
Expand Down
30 changes: 6 additions & 24 deletions cylc/flow/scripts/set_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def get_option_parser() -> COP:
)

parser.add_option(
"-o", "--out", metavar="OUTPUT(s)",
"-o", "--out", "--output", metavar="OUTPUT(s)",
help=(
"Set task outputs completed, along with any implied outputs."
' May be "all", to set all required outputs complete.'
Expand All @@ -100,7 +100,7 @@ def get_option_parser() -> COP:
)

parser.add_option(
"-p", "--pre", metavar="PREREQUISITE(s)",
"-p", "--pre", "--prerequisite", metavar="PREREQUISITE(s)",
help=(
"Set task prerequisites satisfied."
' May be "all", which is equivalent to "cylc trigger".'
Expand All @@ -116,7 +116,7 @@ def get_option_parser() -> COP:
" If the task is already spawned, use current flows and"
" merge new flow numbers if specified."
),
action="store", default=None, dest="flow_num"
action="store", default=None, dest="flow"
)

parser.add_option(
Expand All @@ -134,26 +134,6 @@ def get_option_parser() -> COP:
return parser


def _validate(options):
"""Check validity of flow-related options."""
for val in options.flow:
val = val.strip()
if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]:
if len(options.flow) != 1:
raise InputError(ERR_OPT_FLOW_INT)
else:
try:
int(val)
except ValueError:
raise InputError(ERR_OPT_FLOW_VAL.format(val))

if options.flow_descr and options.flow != [FLOW_NEW]:
raise InputError(ERR_OPT_FLOW_META)

if options.flow_wait and options.flow[0] in [FLOW_NEW, FLOW_NONE]:
raise InputError(ERR_OPT_FLOW_WAIT)


async def run(options: 'Values', workflow_id: str, *tokens_list) -> None:
pclient = get_client(workflow_id, timeout=options.comms_timeout)

Expand All @@ -167,7 +147,9 @@ async def run(options: 'Values', workflow_id: str, *tokens_list) -> None:
],
'outputs': options.outputs,
'prerequisites': options.prerequisites,
'flowNum': options.flow_num
'flow': options.flow,
'flowWait': options.flow_wait,
'flowDescr': options.flow_descr
}
}

Expand Down
1 change: 0 additions & 1 deletion cylc/flow/scripts/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ async def run(options: 'Values', workflow_id: str, *tokens_list):
'flowDescr': options.flow_descr,
}
}

await pclient.async_request('graphql', mutation_kwargs)


Expand Down
56 changes: 32 additions & 24 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1579,8 +1579,10 @@ def spawn_task(
def force_spawn_children(
self,
items: Iterable[str],
outputs: Optional[List[str]] = None,
flow_num: Optional[int] = None
outputs: List[str],
flow: List[str],
flow_wait: bool = False,
flow_descr: str = "",
):
"""Spawn downstream children of given outputs, on user command.
Expand All @@ -1591,14 +1593,13 @@ def force_spawn_children(
items: Identifiers for matching task definitions, each with the
form "point/name".
outputs: List of outputs to spawn on
flow_num: Flow number to attribute the outputs
flow: Flow number to attribute the outputs
"""
outputs = outputs or [TASK_OUTPUT_SUCCEEDED]
if flow_num is None:
flow_nums = None
else:
flow_nums = {flow_num}
flow_nums = self._flow_cmd_helper(flow)
if flow_nums is None:
return

n_warnings, task_items = self.match_taskdefs(items)
for (_, point), taskdef in sorted(task_items.items()):
Expand Down Expand Up @@ -1638,27 +1639,15 @@ def remove_tasks(self, items):
self.release_runahead_tasks()
return len(bad_items)

def force_trigger_tasks(
self, items: Iterable[str],
flow: List[str],
flow_wait: bool = False,
flow_descr: Optional[str] = None
) -> int:
"""Manual task triggering.
Don't get a new flow number for existing n=0 tasks (e.g. incomplete
tasks). These can carry on in the original flow if retriggered.
Queue the task if not queued, otherwise release it to run.
"""
def _flow_cmd_helper(self, flow):
# TODO type hints
if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}):
if len(flow) != 1:
LOG.warning(
f'The "flow" values {FLOW_ALL}, {FLOW_NEW} & {FLOW_NONE}'
' cannot be used in combination with integer flow numbers.'
)
return 0
return None
if flow[0] == FLOW_ALL:
flow_nums = self._get_active_flow_nums()
elif flow[0] == FLOW_NEW:
Expand All @@ -1670,9 +1659,28 @@ def force_trigger_tasks(
flow_nums = {int(n) for n in flow}
except ValueError:
LOG.warning(
f"Trigger ignored, illegal flow values {flow}"
f"Ignoring command: illegal flow values {flow}"
)
return 0
return None
return flow_nums

def force_trigger_tasks(
self, items: Iterable[str],
flow: List[str],
flow_wait: bool = False,
flow_descr: Optional[str] = None
) -> int:
"""Manual task triggering.
Don't get a new flow number for existing n=0 tasks (e.g. incomplete
tasks). These can carry on in the original flow if retriggered.
Queue the task if not queued, otherwise release it to run.
"""
flow_nums = self._flow_cmd_helper(flow)
if flow_nums is None:
return

# n_warnings, task_items = self.match_taskdefs(items)
itasks, future_tasks, unmatched = self.filter_task_proxies(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[scheduler]
[[events]]
stall handlers = "cylc set-task --flow=1 %(workflow)s//1/bar"
stall handlers = "cylc set-task %(workflow)s//1/bar"
stall timeout = PT0S
abort on stall timeout = False
expected task failures = 1/bar
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/logging/02-duplicates/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
script = false
[[bar]]
script = """
cylc set-task --flow=1 "${CYLC_WORKFLOW_ID}" "foo.${CYLC_TASK_CYCLE_POINT}"
cylc set-task "${CYLC_WORKFLOW_ID}" "foo.${CYLC_TASK_CYCLE_POINT}"
"""
[[restart]]
script = """
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/reload/17-graphing-change.t
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ cp "${TEST_SOURCE_DIR}/graphing-change/flow-2.cylc" \
"${RUN_DIR}/${WORKFLOW_NAME}/flow.cylc"

# Spawn a couple of task proxies, to get "task definition removed" message.
cylc set-task --flow=1 "${WORKFLOW_NAME}//1/foo"
cylc set-task --flow=1 "${WORKFLOW_NAME}//1/baz"
cylc set-task "${WORKFLOW_NAME}//1/foo"
cylc set-task "${WORKFLOW_NAME}//1/baz"
# reload workflow
run_ok "${TEST_NAME_BASE}-swap-reload" cylc reload "${WORKFLOW_NAME}"
poll grep_workflow_log_n_times 'Reload completed' 3
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/runahead/default-future/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
[[spawner]]
script = """
# spawn wibble
cylc set-task --flow=1 $CYLC_WORKFLOW_ID 20100101T0800Z/foo
cylc set-task $CYLC_WORKFLOW_ID 20100101T0800Z/foo
"""
[[foo]]
script = false
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# Check that "cylc set-task" works like it says on the tin.
# Check that "cylc set-task" continues a flow by default.
. "$(dirname "$0")/test_header"
set_test_number 2
reftest
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Test that `cylc set-task` does not cause reflow by default
# Task setter should cause bar to run, but not subsequently baz.
# Test that `cylc set-task` continues the active flow by default
# Task setter should cause bar to run, then subsequently baz.

[scheduler]
[[events]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ Final point: 1
1/foo -triggered off []
1/setter -triggered off ['1/foo']
1/bar -triggered off ['1/foo']
1/baz -triggered off ['1/bar']

0 comments on commit 569797d

Please sign in to comment.