diff --git a/cylc/flow/command_validation.py b/cylc/flow/command_validation.py new file mode 100644 index 00000000000..ab8d47b019e --- /dev/null +++ b/cylc/flow/command_validation.py @@ -0,0 +1,257 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Cylc command argument validation logic.""" + + +from typing import ( + Callable, + List, + Optional, +) + +from cylc.flow.exceptions import InputError +from cylc.flow.id import Tokens +from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED +from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE + + +ERR_OPT_FLOW_VAL = "Flow values must be an integer, or 'all', 'new', or 'none'" +ERR_OPT_FLOW_INT = "Multiple flow options must all be integer valued" +ERR_OPT_FLOW_WAIT = ( + f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}" +) + + +def validate(func: Callable): + """Decorate scheduler commands with a callable .validate attribute. + + """ + # TODO: properly handle "Callable has no attribute validate"? + func.validate = globals()[ # type: ignore + func.__name__.replace("command", "validate") + ] + return func + + +def validate_flow_opts(flows: List[str], flow_wait: bool) -> None: + """Check validity of flow-related CLI options. + + Note the schema defaults flows to ["all"]. + + Examples: + Good: + >>> validate_flow_opts(["new"], False) + >>> validate_flow_opts(["1", "2"], False) + >>> validate_flow_opts(["1", "2"], True) + + Bad: + >>> validate_flow_opts(["none", "1"], False) + Traceback (most recent call last): + cylc.flow.exceptions.InputError: ... must all be integer valued + + >>> validate_flow_opts(["cheese", "2"], True) + Traceback (most recent call last): + cylc.flow.exceptions.InputError: ... or 'all', 'new', or 'none' + + >>> validate_flow_opts(["new"], True) + Traceback (most recent call last): + cylc.flow.exceptions.InputError: ... + + """ + for val in flows: + val = val.strip() + if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]: + if len(flows) != 1: + raise InputError(ERR_OPT_FLOW_INT) + else: + try: + int(val) + except ValueError: + raise InputError(ERR_OPT_FLOW_VAL.format(val)) + + if flow_wait and flows[0] in [FLOW_NEW, FLOW_NONE]: + raise InputError(ERR_OPT_FLOW_WAIT) + + +def validate_prereqs(prereqs: Optional[List[str]]): + """Validate a list of prerequisites, add implicit ":succeeded". + + Comma-separated lists should be split already, client-side. + + Examples: + # Set multiple at once: + >>> validate_prereqs(['1/foo:bar', '2/foo:baz']) + ['1/foo:bar', '2/foo:baz'] + + # --pre=all + >>> validate_prereqs(["all"]) + ['all'] + + # implicit ":succeeded" + >>> validate_prereqs(["1/foo"]) + ['1/foo:succeeded'] + + # Error: invalid format: + >>> validate_prereqs(["fish"]) + Traceback (most recent call last): + cylc.flow.exceptions.InputError: ... + + # Error: invalid format: + >>> validate_prereqs(["1/foo::bar"]) + Traceback (most recent call last): + cylc.flow.exceptions.InputError: ... + + # Error: "all" must be used alone: + >>> validate_prereqs(["all", "2/foo:baz"]) + Traceback (most recent call last): + cylc.flow.exceptions.InputError: ... + + """ + if prereqs is None: + return [] + + prereqs2 = [] + bad: List[str] = [] + for pre in prereqs: + p = validate_prereq(pre) + if p is not None: + prereqs2.append(p) + else: + bad.append(pre) + if bad: + raise InputError( + "Use prerequisite format /:output\n" + "\n ".join(bad) + ) + + if len(prereqs2) > 1: # noqa SIM102 (anticipates "cylc set --pre=cycle") + if "all" in prereqs: + raise InputError("--pre=all must be used alone") + + return prereqs2 + + +def validate_prereq(prereq: str) -> Optional[str]: + """Return prereq (with :succeeded) if valid, else None. + + Format: cycle/task[:output] + + Examples: + >>> validate_prereq('1/foo:succeeded') + '1/foo:succeeded' + + >>> validate_prereq('1/foo') + '1/foo:succeeded' + + >>> validate_prereq('all') + 'all' + + # Error: + >>> validate_prereq('fish') + + """ + try: + tokens = Tokens(prereq, relative=True) + except ValueError: + return None + if ( + tokens["cycle"] == prereq + and prereq != "all" + ): + # Error: --pre= other than "all" + return None + + if prereq != "all" and tokens["task_sel"] is None: + prereq += f":{TASK_OUTPUT_SUCCEEDED}" + + return prereq + + +def validate_outputs(outputs: Optional[List[str]]): + """Validate outputs. + + Comma-separated lists should be split already, client-side. + + Examples: + Good: + >>> validate_outputs(['a', 'b']) + ['a', 'b'] + + >>> validate_outputs(["required"]) # "required" is explicit default + [] + + Bad: + >>> validate_outputs(["required", "a"]) + Traceback (most recent call last): + cylc.flow.exceptions.InputError: --out=required must be used alone + + >>> validate_outputs(["waiting"]) + Traceback (most recent call last): + cylc.flow.exceptions.InputError: Tasks cannot be set to waiting... + + """ + # If "required" is explicit just ditch it (same as the default) + if not outputs or outputs == ["required"]: + return [] + + if "required" in outputs: + raise InputError("--out=required must be used alone") + + if "waiting" in outputs: + raise InputError( + "Tasks cannot be set to waiting. Use trigger to re-run tasks." + ) + + return outputs + + +def validate_consistency( + outputs: Optional[List[str]], + prereqs: Optional[List[str]] +) -> None: + """Check global option consistency + + Examples: + >>> validate_consistency(["a"], None) # OK + + >>> validate_consistency(None, ["1/a:failed"]) #OK + + >>> validate_consistency(["a"], ["1/a:failed"]) + Traceback (most recent call last): + cylc.flow.exceptions.InputError: ... + + """ + if outputs and prereqs: + raise InputError("Use --prerequisite or --output, not both.") + + +def validate_set( + tasks: List[str], + flow: List[str], + outputs: Optional[List[str]] = None, + prerequisites: Optional[List[str]] = None, + flow_wait: bool = False, + flow_descr: Optional[str] = None +) -> None: + """Validate args of the scheduler "command_set" method. + + Raise InputError if validation fails. + """ + validate_consistency(outputs, prerequisites) + outputs = validate_outputs(outputs) + prerequisites = validate_prereqs(prerequisites) + validate_flow_opts(flow, flow_wait) diff --git a/cylc/flow/flow_mgr.py b/cylc/flow/flow_mgr.py index 7bb293bfc85..1cd1c1e8c70 100644 --- a/cylc/flow/flow_mgr.py +++ b/cylc/flow/flow_mgr.py @@ -20,7 +20,6 @@ import datetime from cylc.flow import LOG -from cylc.flow.exceptions import InputError if TYPE_CHECKING: @@ -32,13 +31,6 @@ FLOW_NEW = "new" FLOW_NONE = "none" -# For flow-related CLI options: -ERR_OPT_FLOW_VAL = "Flow values must be an integer, or 'all', 'new', or 'none'" -ERR_OPT_FLOW_INT = "Multiple flow options must all be integer valued" -ERR_OPT_FLOW_WAIT = ( - f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}" -) - def add_flow_opts(parser): parser.add_option( @@ -63,27 +55,6 @@ def add_flow_opts(parser): ) -def validate_flow_opts(options): - """Check validity of flow-related CLI options.""" - if options.flow is None: - # Default to all active flows - options.flow = [FLOW_ALL] - - for val in options.flow: - val = val.strip() - if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]: - if len(options.flow) != 1: - raise InputError(ERR_OPT_FLOW_INT) - else: - try: - int(val) - except ValueError: - raise InputError(ERR_OPT_FLOW_VAL.format(val)) - - if options.flow_wait and options.flow[0] in [FLOW_NEW, FLOW_NONE]: - raise InputError(ERR_OPT_FLOW_WAIT) - - def stringify_flow_nums(flow_nums: Set[int], full: bool = False) -> str: """Return a string representation of a set of flow numbers diff --git a/cylc/flow/network/multi.py b/cylc/flow/network/multi.py index a11842d6391..8baa9f119bf 100644 --- a/cylc/flow/network/multi.py +++ b/cylc/flow/network/multi.py @@ -19,6 +19,30 @@ from cylc.flow.async_util import unordered_map from cylc.flow.id_cli import parse_ids_async +from cylc.flow.exceptions import InputError + + +def print_response(multi_results): + """Print server mutation response to stdout. + + The response will be either: + - (False, argument-validation-error) + - (True, ID-of-queued-command) + + Raise InputError if validation failed. + + """ + for multi_result in multi_results: + for _cmd, results in multi_result.items(): + for result in results.values(): + for wf_res in result: + wf_id = wf_res["id"] + response = wf_res["response"] + if not response[0]: + # Validation failure + raise InputError(response[1]) + else: + print(f"{wf_id}: command {response[1]} queued") def call_multi(*args, **kwargs): @@ -107,4 +131,4 @@ def _report_single(report, workflow, result): def _report(_): - print('Command submitted; the scheduler will log any problems.') + pass diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 5451ef501aa..95b33ad1f7e 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -42,6 +42,7 @@ EDGES, FAMILY_PROXIES, TASK_PROXIES, WORKFLOW, DELTA_ADDED, create_delta_store ) +from cylc.flow.exceptions import InputError from cylc.flow.id import Tokens from cylc.flow.network.schema import ( DEF_TYPES, @@ -740,10 +741,19 @@ async def _mutation_mapper( return method(**kwargs) try: - self.schd.get_command_method(command) + meth = self.schd.get_command_method(command) except AttributeError: raise ValueError(f"Command '{command}' not found") + # If meth has a command validation function, call it. + try: + # TODO: properly handle "Callable has no attribute validate"? + meth.validate(**kwargs) # type: ignore + except AttributeError: + LOG.debug(f"No command validation for {command}") + except InputError as exc: + return (False, str(exc)) + # Queue the command to the scheduler, with a unique command ID cmd_uuid = str(uuid4()) LOG.info(f"{log1} ID={cmd_uuid}\n{log2}") diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 566c4d977c1..508633bc306 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -53,6 +53,7 @@ ) from cylc.flow.broadcast_mgr import BroadcastMgr from cylc.flow.cfgspec.glbl_cfg import glbl_cfg +from cylc.flow import command_validation from cylc.flow.config import WorkflowConfig from cylc.flow.data_store_mgr import DataStoreMgr from cylc.flow.id import Tokens @@ -2146,6 +2147,7 @@ def command_force_trigger_tasks( return self.pool.force_trigger_tasks( tasks, flow, flow_wait, flow_descr) + @command_validation.validate def command_set( self, tasks: List[str], diff --git a/cylc/flow/scripts/set.py b/cylc/flow/scripts/set.py index 4f0cd19522f..4f00b458038 100755 --- a/cylc/flow/scripts/set.py +++ b/cylc/flow/scripts/set.py @@ -89,26 +89,28 @@ """ from functools import partial -from typing import TYPE_CHECKING, List, Optional +from typing import Tuple, TYPE_CHECKING from cylc.flow.exceptions import InputError from cylc.flow.network.client_factory import get_client -from cylc.flow.network.multi import call_multi +from cylc.flow.network.multi import ( + call_multi, + print_response +) from cylc.flow.option_parsers import ( FULL_ID_MULTI_ARG_DOC, CylcOptionParser as COP, ) -from cylc.flow.id import Tokens -from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED -from cylc.flow.terminal import cli_function -from cylc.flow.flow_mgr import ( - add_flow_opts, - validate_flow_opts +from cylc.flow.terminal import ( + cli_function, + flatten_cli_lists ) +from cylc.flow.flow_mgr import add_flow_opts if TYPE_CHECKING: from optparse import Values + from cylc.flow.id import Tokens MUTATION = ''' @@ -177,189 +179,14 @@ def get_option_parser() -> COP: return parser -def validate_prereq(prereq: str) -> Optional[str]: - """Return prereq (with :succeeded) if valid, else None. - - Format: cycle/task[:output] - - Examples: - >>> validate_prereq('1/foo:succeeded') - '1/foo:succeeded' - - >>> validate_prereq('1/foo') - '1/foo:succeeded' - - >>> validate_prereq('all') - 'all' - - # Error: - >>> validate_prereq('fish') - - """ - try: - tokens = Tokens(prereq, relative=True) - except ValueError: - return None - if ( - tokens["cycle"] == prereq - and prereq != "all" - ): - # Error: --pre= other than "all" - return None - - if prereq != "all" and tokens["task_sel"] is None: - prereq += f":{TASK_OUTPUT_SUCCEEDED}" - - return prereq - - -def split_opts(options: List[str]): - """Return list from multi-use and comma-separated options. - - Examples: - # --out='a,b,c' - >>> split_opts(['a,b,c']) - ['a', 'b', 'c'] - - # --out='a' --out='a,b' - >>> split_opts(['a', 'b,c']) - ['a', 'b', 'c'] - - # --out='a' --out='a,b' - >>> split_opts(['a', 'a,b']) - ['a', 'b'] - - # --out=' a ' - >>> split_opts([' a ']) - ['a'] - - # --out='a, b, c , d' - >>> split_opts(['a, b, c , d']) - ['a', 'b', 'c', 'd'] - - """ - return sorted({ - item.strip() - for option in (options or []) - for item in option.strip().split(',') - }) - - -def get_prereq_opts(prereq_options: List[str]): - """Convert prerequisites to a flat list with output selectors. - - Examples: - # Set multiple at once: - >>> get_prereq_opts(['1/foo:bar', '2/foo:baz,3/foo:qux']) - ['1/foo:bar', '2/foo:baz', '3/foo:qux'] - - # --pre=all - >>> get_prereq_opts(["all"]) - ['all'] - - # implicit ":succeeded" - >>> get_prereq_opts(["1/foo"]) - ['1/foo:succeeded'] - - # Error: invalid format: - >>> get_prereq_opts(["fish"]) - Traceback (most recent call last): - cylc.flow.exceptions.InputError: ... - - # Error: invalid format: - >>> get_prereq_opts(["1/foo::bar"]) - Traceback (most recent call last): - cylc.flow.exceptions.InputError: ... - - # Error: "all" must be used alone: - >>> get_prereq_opts(["all", "2/foo:baz"]) - Traceback (most recent call last): - cylc.flow.exceptions.InputError: ... - - """ - prereqs = split_opts(prereq_options) - if not prereqs: - return [] - - prereqs2 = [] - bad: List[str] = [] - for pre in prereqs: - p = validate_prereq(pre) - if p is not None: - prereqs2.append(p) - else: - bad.append(pre) - if bad: - raise InputError( - "Use prerequisite format /:output\n" - "\n ".join(bad) - ) - - if len(prereqs2) > 1: # noqa SIM102 (anticipates "cylc set --pre=cycle") - if "all" in prereqs: - raise InputError("--pre=all must be used alone") - - return prereqs2 - - -def get_output_opts(output_options: List[str]): - """Convert outputs options to a flat list, and validate. - - Examples: - Good: - >>> get_output_opts(['a', 'b,c']) - ['a', 'b', 'c'] - >>> get_output_opts(["required"]) # "required" is explicit default - [] - - Bad: - >>> get_output_opts(["required", "a"]) # "required" must be used alone - Traceback (most recent call last): - cylc.flow.exceptions.InputError: --out=required must be used alone - >>> get_output_opts(["waiting"]) # cannot "reset" to waiting - Traceback (most recent call last): - cylc.flow.exceptions.InputError: Tasks cannot be set to waiting... - - """ - outputs = split_opts(output_options) - - # If "required" is explicit just ditch it (same as the default) - if not outputs or outputs == ["required"]: - return [] - - if "required" in outputs: - raise InputError("--out=required must be used alone") - if "waiting" in outputs: - raise InputError( - "Tasks cannot be set to waiting, use a new flow to re-run" - ) - - return outputs - - -def validate_opts(output_opt: List[str], prereq_opt: List[str]): - """Check global option consistency - - Examples: - >>> validate_opts(["a"], None) # OK - - >>> validate_opts(None, ["1/a:failed"]) #OK - - >>> validate_opts(["a"], ["1/a:failed"]) - Traceback (most recent call last): - cylc.flow.exceptions.InputError: ... - - """ - if output_opt and prereq_opt: - raise InputError("Use --prerequisite or --output, not both.") - - -def validate_tokens(tokens_list): +def validate_tokens(tokens_list: Tuple['Tokens']) -> None: """Check the cycles/tasks provided. This checks that cycle/task selectors have not been provided in the IDs. Examples: + >>> from cylc.flow.id import Tokens + Good: >>> validate_tokens([Tokens('w//c')]) >>> validate_tokens([Tokens('w//c/t')]) @@ -391,6 +218,7 @@ async def run( workflow_id: str, *tokens_list ) -> None: + validate_tokens(tokens_list) pclient = get_client(workflow_id, timeout=options.comms_timeout) @@ -403,22 +231,19 @@ async def run( tokens.relative_id_with_selectors for tokens in tokens_list ], - 'outputs': get_output_opts(options.outputs), - 'prerequisites': get_prereq_opts(options.prerequisites), + 'outputs': flatten_cli_lists(options.outputs), + 'prerequisites': flatten_cli_lists(options.prerequisites), 'flow': options.flow, 'flowWait': options.flow_wait, 'flowDescr': options.flow_descr } } - await pclient.async_request('graphql', mutation_kwargs) + return await pclient.async_request('graphql', mutation_kwargs) @cli_function(get_option_parser) def main(parser: COP, options: 'Values', *ids) -> None: - validate_opts(options.outputs, options.prerequisites) - validate_flow_opts(options) - call_multi( - partial(run, options), - *ids, + print_response( + call_multi(partial(run, options), *ids) ) diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index 9d27fc38bd1..f49c0d37e75 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -51,10 +51,8 @@ CylcOptionParser as COP, ) from cylc.flow.terminal import cli_function -from cylc.flow.flow_mgr import ( - add_flow_opts, - validate_flow_opts -) +from cylc.flow.flow_mgr import add_flow_opts +from cylc.flow.command_validation import validate_flow_opts if TYPE_CHECKING: @@ -116,7 +114,7 @@ async def run(options: 'Values', workflow_id: str, *tokens_list): @cli_function(get_option_parser) def main(parser: COP, options: 'Values', *ids: str): """CLI for "cylc trigger".""" - validate_flow_opts(options) + validate_flow_opts(options.flow or ['all'], options.flow_wait) call_multi( partial(run, options), *ids, diff --git a/cylc/flow/terminal.py b/cylc/flow/terminal.py index e848691252e..4304fc32201 100644 --- a/cylc/flow/terminal.py +++ b/cylc/flow/terminal.py @@ -24,7 +24,7 @@ from subprocess import PIPE, Popen # nosec import sys from textwrap import wrap -from typing import Any, Callable, Optional, TYPE_CHECKING +from typing import Any, Callable, List, Optional, TYPE_CHECKING from ansimarkup import parse as cparse from colorama import init as color_init @@ -373,3 +373,35 @@ def prompt(message, options, default=None, process=None): if isinstance(options, dict): return options[usr] return usr + + +def flatten_cli_lists(lsts: List[str]) -> List[str]: + """Return a sorted flat list for multi-use CLI command options. + + Examples: + # --out='a,b,c' + >>> flatten_cli_lists(['a,b,c']) + ['a', 'b', 'c'] + + # --out='a' --out='a,b' + >>> flatten_cli_lists(['a', 'b,c']) + ['a', 'b', 'c'] + + # --out='a' --out='a,b' + >>> flatten_cli_lists(['a', 'a,b']) + ['a', 'b'] + + # --out=' a ' + >>> flatten_cli_lists([' a ']) + ['a'] + + # --out='a, b, c , d' + >>> flatten_cli_lists(['a, b, c , d']) + ['a', 'b', 'c', 'd'] + + """ + return sorted({ + item.strip() + for lst in (lsts or []) + for item in lst.strip().split(',') + }) diff --git a/tests/unit/scripts/test_trigger.py b/tests/unit/scripts/test_trigger.py deleted file mode 100644 index 87f392d73f8..00000000000 --- a/tests/unit/scripts/test_trigger.py +++ /dev/null @@ -1,124 +0,0 @@ -# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. -# Copyright (C) NIWA & British Crown (Met Office) & Contributors. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -"""Test logic in cylc-trigger script.""" - -from optparse import Values -import pytest -from typing import Optional, Tuple, Type - -from cylc.flow.exceptions import InputError -from cylc.flow.option_parsers import Options -from cylc.flow.flow_mgr import ( - FLOW_ALL, - FLOW_NEW, - FLOW_NONE, - validate_flow_opts -) -from cylc.flow.scripts.trigger import get_option_parser - - -Opts = Options(get_option_parser()) - - -@pytest.mark.parametrize( - 'opts, expected_err', - [ - ( - Opts( - flow=[FLOW_ALL], - flow_wait=False - ), - None - ), - ( - Opts( - flow=None, - flow_wait=False - ), - None - ), - ( - Opts( - flow=[FLOW_NEW], - flow_wait=False, - flow_descr="Denial is a deep river" - ), - None - ), - ( - Opts( - flow=[FLOW_ALL, "1"], - flow_wait=False - ), - ( - InputError, - "Multiple flow options must all be integer valued" - ) - ), - ( - Opts( - flow=["cheese"], - flow_wait=False - ), - ( - InputError, - "Flow values must be an integer, or 'all', 'new', or 'none'" - ) - ), - ( - Opts( - flow=[FLOW_NONE], - flow_wait=True - ), - ( - InputError, - "--wait is not compatible with --flow=new or --flow=none" - ) - ), - ( - Opts( - flow=[FLOW_ALL, "1"], - flow_wait=False - ), - ( - InputError, - "Multiple flow options must all be integer valued" - ) - ), - ( - Opts( - flow=[FLOW_ALL, "1"], - flow_wait=False - ), - ( - InputError, - "Multiple flow options must all be integer valued" - ) - ), - ] -) -def test_validate( - opts: Values, - expected_err: Optional[Tuple[Type[Exception], str]]): - - if expected_err: - err, msg = expected_err - with pytest.raises(err) as exc: - validate_flow_opts(opts) - assert msg in str(exc.value) - else: - validate_flow_opts(opts)