Skip to content

Commit

Permalink
Merge pull request cylc#6175 from oliver-sanders/6157
Browse files Browse the repository at this point in the history
workflow_state: reject invalid configurations
  • Loading branch information
MetRonnie authored Sep 3, 2024
2 parents 3c001bb + 22c3b52 commit 8bb1f44
Show file tree
Hide file tree
Showing 13 changed files with 179 additions and 46 deletions.
1 change: 1 addition & 0 deletions changes.d/6175.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The workflow-state command and xtrigger will now reject invalid polling arguments.
28 changes: 28 additions & 0 deletions cylc/flow/dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
TASK_OUTPUT_FAILED,
TASK_OUTPUT_FINISHED,
)
from cylc.flow.task_state import (
TASK_STATE_MAP,
TASK_STATUSES_FINAL,
)
from cylc.flow.util import deserialise_set
from metomi.isodatetime.parsers import TimePointParser
from metomi.isodatetime.exceptions import ISO8601SyntaxError
Expand Down Expand Up @@ -244,6 +248,8 @@ def workflow_state_query(
stmt_args = []
stmt_wheres = []

check_polling_config(selector, is_trigger, is_message)

if is_trigger or is_message:
target_table = CylcWorkflowDAO.TABLE_TASK_OUTPUTS
mask = "name, cycle, outputs"
Expand Down Expand Up @@ -363,3 +369,25 @@ def _selector_in_outputs(selector: str, outputs: Iterable[str]) -> bool:
or TASK_OUTPUT_FAILED in outputs
)
)


def check_polling_config(selector, is_trigger, is_message):
"""Check for invalid or unreliable polling configurations."""
if selector and not (is_trigger or is_message):
# we are using task status polling
try:
trigger = TASK_STATE_MAP[selector]
except KeyError:
raise InputError(f'No such task state "{selector}"')
else:
if trigger is None:
raise InputError(
f'Cannot poll for the "{selector}" task state'
)

if selector not in TASK_STATUSES_FINAL:
raise InputError(
f'Polling for the "{selector}" task status is not'
' reliable as it is a transient state.'
f'\nPoll for the "{trigger}" trigger instead.'
)
51 changes: 26 additions & 25 deletions cylc/flow/scripts/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
so you can start checking before the target workflow is started.
Legacy (pre-8.3.0) options are supported, but deprecated, for existing scripts:
cylc workflow-state --task=NAME --point=CYCLE --status=STATUS
--output=MESSAGE --message=MESSAGE --task-point WORKFLOW
cylc workflow-state --task=NAME --point=CYCLE --status=STATUS
--output=MESSAGE --message=MESSAGE --task-point WORKFLOW
(Note from 8.0 until 8.3.0 --output and --message both match task messages).
In "cycle/task:selector" the selector will match task statuses, unless:
Expand All @@ -55,24 +55,23 @@
Flow numbers are only printed for flow numbers > 1.
USE IN TASK SCRIPTING:
Use in task scripting:
- To poll a task at the same cycle point in another workflow, just use
$CYLC_TASK_CYCLE_POINT in the ID.
- To poll a task at an offset cycle point, use the --offset option to
have Cylc do the datetime arithmetic for you.
- However, see also the workflow_state xtrigger for this use case.
WARNINGS:
- Typos in the workflow or task ID will result in fruitless polling.
- To avoid missing transient states ("submitted", "running") poll for the
corresponding output trigger instead ("submitted", "started").
- Cycle points are auto-converted to the DB point format (and UTC mode).
- Task outputs manually completed by "cylc set" have "(force-completed)"
recorded as the task message in the DB, so it is best to query trigger
names, not messages, unless specifically interested in forced outputs.
Warnings:
- Typos in the workflow or task ID will result in fruitless polling.
- To avoid missing transient states ("submitted", "running") poll for the
corresponding output trigger instead ("submitted", "started").
- Cycle points are auto-converted to the DB point format (and UTC mode).
- Task outputs manually completed by "cylc set" have "(force-completed)"
recorded as the task message in the DB, so it is best to query trigger
names, not messages, unless specifically interested in forced outputs.
Examples:
# Print the status of all tasks in WORKFLOW:
$ cylc workflow-state WORKFLOW
Expand Down Expand Up @@ -115,7 +114,10 @@
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker
from cylc.flow.terminal import cli_function
from cylc.flow.workflow_files import infer_latest_run_from_id
from cylc.flow.task_state import TASK_STATUSES_ORDERED
from cylc.flow.task_state import (
TASK_STATUSES_FINAL,
TASK_STATUSES_ALL,
)

if TYPE_CHECKING:
from optparse import Values
Expand Down Expand Up @@ -175,6 +177,8 @@ def __init__(
self.alt_cylc_run_dir = alt_cylc_run_dir
self.old_format = old_format
self.pretty_print = pretty_print
self.is_message = is_message
self.is_trigger = is_trigger

try:
tokens = Tokens(self.id_)
Expand All @@ -197,17 +201,6 @@ def __init__(
self.result: Optional[List[List[str]]] = None
self._db_checker: Optional[CylcWorkflowDBChecker] = None

self.is_message = is_message
if is_message:
self.is_trigger = False
else:
self.is_trigger = (
is_trigger or
(
self.selector is not None and
self.selector not in TASK_STATUSES_ORDERED
)
)
super().__init__(**kwargs)

def _find_workflow(self) -> bool:
Expand Down Expand Up @@ -363,7 +356,6 @@ def get_option_parser() -> COP:

@cli_function(get_option_parser, remove_opts=["--db"])
def main(parser: COP, options: 'Values', *ids: str) -> None:

# Note it would be cleaner to use 'id_cli.parse_ids()' here to get the
# workflow ID and tokens, but that function infers run number and fails
# if the workflow is not installed yet. We want to be able to start polling
Expand Down Expand Up @@ -427,6 +419,15 @@ def main(parser: COP, options: 'Values', *ids: str) -> None:
msg += id_
else:
msg += id_.replace(options.depr_point, "$CYLC_TASK_CYCLE_POINT")

if (
options.depr_status
and options.depr_status in TASK_STATUSES_ALL
and options.depr_status not in TASK_STATUSES_FINAL
):
# polling for non-final task statuses is flaky
msg += ' and the --triggers option'

LOG.warning(msg)

poller = WorkflowPoller(
Expand Down
28 changes: 20 additions & 8 deletions cylc/flow/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@

from typing import List, Iterable, Set, TYPE_CHECKING
from cylc.flow.prerequisite import Prerequisite
from cylc.flow.task_outputs import TaskOutputs
from cylc.flow.task_outputs import (
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_SUBMIT_FAILED,
TASK_OUTPUT_SUCCEEDED,
TaskOutputs,
)
from cylc.flow.wallclock import get_current_time_string

if TYPE_CHECKING:
Expand Down Expand Up @@ -144,13 +152,17 @@
TASK_STATUS_RUNNING,
}

# Task statuses that can be manually triggered.
TASK_STATUSES_TRIGGERABLE = {
TASK_STATUS_WAITING,
TASK_STATUS_EXPIRED,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED,
# Mapping between task outputs and their corresponding states
TASK_STATE_MAP = {
# status: trigger
TASK_STATUS_WAITING: None,
TASK_STATUS_EXPIRED: TASK_OUTPUT_EXPIRED,
TASK_STATUS_PREPARING: None,
TASK_STATUS_SUBMIT_FAILED: TASK_OUTPUT_SUBMIT_FAILED,
TASK_STATUS_SUBMITTED: TASK_OUTPUT_SUBMITTED,
TASK_STATUS_RUNNING: TASK_OUTPUT_STARTED,
TASK_STATUS_FAILED: TASK_OUTPUT_FAILED,
TASK_STATUS_SUCCEEDED: TASK_OUTPUT_SUCCEEDED,
}


Expand Down
17 changes: 15 additions & 2 deletions cylc/flow/xtriggers/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@

from cylc.flow.scripts.workflow_state import WorkflowPoller
from cylc.flow.id import tokenise
from cylc.flow.exceptions import WorkflowConfigError
from cylc.flow.exceptions import WorkflowConfigError, InputError
from cylc.flow.task_state import TASK_STATUS_SUCCEEDED
from cylc.flow.dbstatecheck import check_polling_config


DEFAULT_STATUS = TASK_STATUS_SUCCEEDED


def workflow_state(
Expand Down Expand Up @@ -84,7 +88,7 @@ def workflow_state(
offset,
flow_num,
alt_cylc_run_dir,
TASK_STATUS_SUCCEEDED,
DEFAULT_STATUS,
is_trigger, is_message,
old_format=False,
condition=workflow_task_id,
Expand Down Expand Up @@ -151,6 +155,15 @@ def validate(args: Dict[str, Any]):
):
raise WorkflowConfigError("flow_num must be an integer if given.")

try:
check_polling_config(
tokens['cycle_sel'] or tokens['task_sel'] or DEFAULT_STATUS,
args['is_trigger'],
args['is_message'],
)
except InputError as exc:
raise WorkflowConfigError(str(exc)) from None


# BACK COMPAT: workflow_state_backcompat
# from: 8.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
initial cycle point = 2011
final cycle point = 2016
[[xtriggers]]
upstream = workflow_state("{{UPSTREAM}}//%(point)s/foo:data_ready"):PT1S
upstream = workflow_state("{{UPSTREAM}}//%(point)s/foo:data_ready", is_trigger=True):PT1S
[[graph]]
P1Y = """
foo
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/shutdown/08-now1/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
[[[events]]]
# wait for the stopping message, sleep a bit, then echo some stuff
started handlers = """
cylc workflow-state %(workflow)s//%(point)s/%(name)s:stopping >/dev/null && sleep 1 && echo 'Hello %(id)s %(event)s'
cylc workflow-state %(workflow)s//%(point)s/%(name)s:stopping --triggers >/dev/null && sleep 1 && echo 'Hello %(id)s %(event)s'
"""
[[[outputs]]]
stopping = stopping
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/workflow-state/05-output.t
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ workflow_run_ok "${TEST_NAME}" \
cylc play --reference-test --debug --no-detach "${WORKFLOW_NAME}"

TEST_NAME=${TEST_NAME_BASE}-cli-check
run_ok "${TEST_NAME}" cylc workflow-state "${WORKFLOW_NAME}//20100101T0000Z/t1:out1" --max-polls=1
run_ok "${TEST_NAME}" cylc workflow-state "${WORKFLOW_NAME}//20100101T0000Z/t1:out1" --triggers --max-polls=1

purge
2 changes: 1 addition & 1 deletion tests/functional/workflow-state/07-message2.t
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ workflow_run_ok "${TEST_NAME_BASE}-run" \
cylc play --debug --no-detach "${WORKFLOW_NAME}"

TEST_NAME=${TEST_NAME_BASE}-query
run_fail "${TEST_NAME}" cylc workflow-state "${WORKFLOW_NAME}//2013/foo:x" --max-polls=1
run_fail "${TEST_NAME}" cylc workflow-state "${WORKFLOW_NAME}//2013/foo:x" --triggers --max-polls=1

grep_ok "failed after 1 polls" "${TEST_NAME}.stderr"

Expand Down
6 changes: 3 additions & 3 deletions tests/functional/workflow-state/11-multi.t
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ CMD="cylc workflow-state --run-dir=$DBDIR --max-polls=1"
# foo|1|[1]|2024-06-05T16:34:02+12:00|2024-06-05T16:34:04+12:00|1|succeeded|0|0

#---------------
# Test the new-format command line (pre-8.3.0).
# Test the new-format command line (8.3.0+).
T=${TEST_NAME_BASE}-cli-c8b
run_ok "${T}-1" $CMD c8b
run_ok "${T}-2" $CMD c8b//1
run_ok "${T}-3" $CMD c8b//1/foo
run_fail "${T}-4" $CMD c8b//1/foo:waiting
run_ok "${T}-4" $CMD c8b//1/foo:succeeded
run_ok "${T}-5" $CMD "c8b//1/foo:the quick brown" --messages
run_ok "${T}-6" $CMD "c8b//1/foo:x" --triggers
run_ok "${T}-7" $CMD "c8b//1/foo:x" # default to trigger if not a status
run_ok "${T}-8" $CMD c8b//1
run_ok "${T}-9" $CMD c8b//1:succeeded

Expand All @@ -86,7 +86,7 @@ run_fail "${T}-2" $CMD "c7//1/foo:the quick brown" --triggers
run_ok "${T}-3" $CMD "c7//1/foo:x" --triggers

#---------------
# Test the old-format command line (8.3.0+).
# Test the old-format command line (pre-8.3.0).
T=${TEST_NAME_BASE}-cli-8b-compat
run_ok "${T}-1" $CMD c8b
run_ok "${T}-2" $CMD c8b --point=1
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/workflow-state/11-multi/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# Cylc 8 new (from 8.3.0)
c1 = workflow_state(c8b//1/foo, offset=P0, alt_cylc_run_dir={{ALT}}):PT1S
c2 = workflow_state(c8b//1/foo:succeeded, offset=P0, alt_cylc_run_dir={{ALT}}):PT1S
c3 = workflow_state(c8b//1/foo:x, offset=P0, alt_cylc_run_dir={{ALT}}):PT1S
c3 = workflow_state(c8b//1/foo:x, offset=P0, alt_cylc_run_dir={{ALT}}, is_trigger=True):PT1S
c4 = workflow_state(c8b//1/foo:"the quick brown", offset=P0, is_message=True, alt_cylc_run_dir={{ALT}}):PT1S

[[graph]]
Expand Down
44 changes: 44 additions & 0 deletions tests/unit/test_dbstatecheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from cylc.flow.dbstatecheck import check_polling_config
from cylc.flow.exceptions import InputError

import pytest


def test_check_polling_config():
"""It should reject invalid or unreliable polling configurations.
See https://github.com/cylc/cylc-flow/issues/6157
"""
# invalid polling use cases
with pytest.raises(InputError, match='No such task state'):
check_polling_config('elephant', False, False)

with pytest.raises(InputError, match='Cannot poll for'):
check_polling_config('waiting', False, False)

with pytest.raises(InputError, match='is not reliable'):
check_polling_config('running', False, False)

# valid polling use cases
check_polling_config('started', True, False)
check_polling_config('started', False, True)

# valid query use cases
check_polling_config(None, False, True)
check_polling_config(None, False, False)
Loading

0 comments on commit 8bb1f44

Please sign in to comment.