From 1c590b0db1b84633838d9c671033dccd06293159 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Mon, 16 Oct 2023 10:27:31 +0100 Subject: [PATCH] speed up import time * Don't import modules for type checking purposes. * Move some light weight functions out of a heavy weight module to reduce the import penalty. * Make a couple of heavy weight imports dynamic to avoid importing them when not necessary. * This reduces the import time of cylc.flow.scripts.message by ~66%. * This reduces the task runtime (`script=true`) by ~1s. --- cylc/flow/config.py | 2 +- cylc/flow/exceptions.py | 7 +- cylc/flow/flow_mgr.py | 7 +- cylc/flow/id_cli.py | 11 +- cylc/flow/job_file.py | 2 +- .../flow/job_runner_handlers/documentation.py | 9 +- cylc/flow/log_level.py | 116 ++++++++++++++++++ cylc/flow/network/server.py | 4 +- cylc/flow/option_parsers.py | 99 +-------------- cylc/flow/parsec/fileparse.py | 4 +- cylc/flow/remote.py | 2 +- cylc/flow/scheduler.py | 20 +-- cylc/flow/scripts/cat_log.py | 2 +- cylc/flow/scripts/kill.py | 8 +- cylc/flow/scripts/lint.py | 6 +- cylc/flow/scripts/set_outputs.py | 5 +- cylc/flow/scripts/set_verbosity.py | 6 +- cylc/flow/scripts/show.py | 7 +- cylc/flow/scripts/validate.py | 5 +- cylc/flow/task_events_mgr.py | 4 +- cylc/flow/task_id.py | 11 +- cylc/flow/task_proxy.py | 2 +- cylc/flow/task_queues/__init__.py | 13 +- cylc/flow/task_queues/independent.py | 28 +++-- cylc/flow/task_remote_mgr.py | 18 ++- cylc/flow/templatevars.py | 11 +- cylc/flow/terminal.py | 8 +- cylc/flow/workflow_files.py | 7 +- cylc/flow/xtrigger_mgr.py | 47 ++++--- setup.cfg | 1 + tests/unit/test_id_cli.py | 2 +- 31 files changed, 278 insertions(+), 196 deletions(-) create mode 100644 cylc/flow/log_level.py diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 79d305da57a..0096d924c8b 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -68,7 +68,7 @@ import cylc.flow.flags from cylc.flow.graph_parser import GraphParser from cylc.flow.listify import listify -from cylc.flow.option_parsers import verbosity_to_env +from cylc.flow.log_level import verbosity_to_env from cylc.flow.graphnode import GraphNodeParser from cylc.flow.param_expand import NameExpander from cylc.flow.parsec.exceptions import ItemNotFoundError diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py index 79a726d7bbe..d1a459f996b 100644 --- a/cylc/flow/exceptions.py +++ b/cylc/flow/exceptions.py @@ -26,11 +26,14 @@ Tuple, Type, Union, + TYPE_CHECKING, ) -from cylc.flow.subprocctx import SubFuncContext from cylc.flow.util import format_cmd +if TYPE_CHECKING: + from cylc.flow.subprocctx import SubFuncContext + class CylcError(Exception): """Generic exception for Cylc errors. @@ -198,7 +201,7 @@ def __init__( message: str, platform_name: str, *, - ctx: Optional[SubFuncContext] = None, + ctx: 'Optional[SubFuncContext]' = None, cmd: Optional[Union[str, Iterable]] = None, ret_code: Optional[int] = None, out: Optional[str] = None, diff --git a/cylc/flow/flow_mgr.py b/cylc/flow/flow_mgr.py index 148adb8213f..41aec947a80 100644 --- a/cylc/flow/flow_mgr.py +++ b/cylc/flow/flow_mgr.py @@ -16,11 +16,14 @@ """Manage flow counter and flow metadata.""" -from typing import Dict, Set, Optional +from typing import Dict, Set, Optional, TYPE_CHECKING import datetime from cylc.flow import LOG -from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager + + +if TYPE_CHECKING: + from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager FlowNums = Set[int] diff --git a/cylc/flow/id_cli.py b/cylc/flow/id_cli.py index 96cb438fded..803c8506439 100644 --- a/cylc/flow/id_cli.py +++ b/cylc/flow/id_cli.py @@ -31,11 +31,6 @@ upgrade_legacy_ids, ) from cylc.flow.pathutil import EXPLICIT_RELATIVE_PATH_REGEX -from cylc.flow.network.scan import ( - filter_name, - is_active, - scan, -) from cylc.flow.workflow_files import ( check_flow_file, detect_both_flow_and_suite, @@ -487,6 +482,12 @@ async def _expand_workflow_tokens_impl(tokens, match_active=True): 'currently supported.' ) + # import only when needed to avoid slowing CLI unnecessarily + from cylc.flow.network.scan import ( + filter_name, + is_active, + scan, + ) # construct the pipe pipe = scan | filter_name(fnmatch.translate(tokens['workflow'])) if match_active is not None: diff --git a/cylc/flow/job_file.py b/cylc/flow/job_file.py index cc8576dad37..86e65e15a8e 100644 --- a/cylc/flow/job_file.py +++ b/cylc/flow/job_file.py @@ -25,7 +25,7 @@ from cylc.flow import __version__ as CYLC_VERSION from cylc.flow.job_runner_mgr import JobRunnerManager import cylc.flow.flags -from cylc.flow.option_parsers import verbosity_to_env +from cylc.flow.log_level import verbosity_to_env from cylc.flow.config import interpolate_template, ParamExpandError # the maximum number of task dependencies which Cylc will list before diff --git a/cylc/flow/job_runner_handlers/documentation.py b/cylc/flow/job_runner_handlers/documentation.py index 735b1801b16..3df2b5a5093 100644 --- a/cylc/flow/job_runner_handlers/documentation.py +++ b/cylc/flow/job_runner_handlers/documentation.py @@ -22,13 +22,16 @@ not intended to be subclassed. """ -import re from typing import ( Iterable, List, Tuple, + TYPE_CHECKING, ) +if TYPE_CHECKING: + import re + class ExampleHandler(): """Documentation for writing job runner handlers. @@ -258,7 +261,7 @@ class QSUBHandler(PBSHandler): """ - REC_ID_FROM_SUBMIT_OUT: re.Pattern + REC_ID_FROM_SUBMIT_OUT: 're.Pattern' """Regular expression to extract job IDs from submission stderr. A regular expression (compiled) to extract the job "id" from the standard @@ -266,7 +269,7 @@ class QSUBHandler(PBSHandler): """ - REC_ID_FROM_SUBMIT_ERR: re.Pattern + REC_ID_FROM_SUBMIT_ERR: 're.Pattern' """Regular expression to extract job IDs from submission stderr. See :py:attr:`ExampleHandler.REC_ID_FROM_SUBMIT_OUT`. diff --git a/cylc/flow/log_level.py b/cylc/flow/log_level.py new file mode 100644 index 00000000000..4b091154893 --- /dev/null +++ b/cylc/flow/log_level.py @@ -0,0 +1,116 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Utilities for configuring logging level via the CLI.""" + +import logging +from typing import List, Dict, Union, TYPE_CHECKING + +if TYPE_CHECKING: + import os + + +def verbosity_to_log_level(verb: int) -> int: + """Convert Cylc verbosity to log severity level.""" + if verb < 0: + return logging.WARNING + if verb > 0: + return logging.DEBUG + return logging.INFO + + +def log_level_to_verbosity(lvl: int) -> int: + """Convert log severity level to Cylc verbosity. + + Examples: + >>> log_level_to_verbosity(logging.NOTSET) + 2 + >>> log_level_to_verbosity(logging.DEBUG) + 1 + >>> log_level_to_verbosity(logging.INFO) + 0 + >>> log_level_to_verbosity(logging.WARNING) + -1 + >>> log_level_to_verbosity(logging.ERROR) + -1 + """ + if lvl < logging.DEBUG: + return 2 + if lvl < logging.INFO: + return 1 + if lvl == logging.INFO: + return 0 + return -1 + + +def verbosity_to_opts(verb: int) -> List[str]: + """Convert Cylc verbosity to the CLI opts required to replicate it. + + Examples: + >>> verbosity_to_opts(0) + [] + >>> verbosity_to_opts(-2) + ['-q', '-q'] + >>> verbosity_to_opts(2) + ['-v', '-v'] + + """ + return [ + '-q' + for _ in range(verb, 0) + ] + [ + '-v' + for _ in range(0, verb) + ] + + +def verbosity_to_env(verb: int) -> Dict[str, str]: + """Convert Cylc verbosity to the env vars required to replicate it. + + Examples: + >>> verbosity_to_env(0) + {'CYLC_VERBOSE': 'false', 'CYLC_DEBUG': 'false'} + >>> verbosity_to_env(1) + {'CYLC_VERBOSE': 'true', 'CYLC_DEBUG': 'false'} + >>> verbosity_to_env(2) + {'CYLC_VERBOSE': 'true', 'CYLC_DEBUG': 'true'} + + """ + return { + 'CYLC_VERBOSE': str((verb > 0)).lower(), + 'CYLC_DEBUG': str((verb > 1)).lower(), + } + + +def env_to_verbosity(env: 'Union[Dict, os._Environ]') -> int: + """Extract verbosity from environment variables. + + Examples: + >>> env_to_verbosity({}) + 0 + >>> env_to_verbosity({'CYLC_VERBOSE': 'true'}) + 1 + >>> env_to_verbosity({'CYLC_DEBUG': 'true'}) + 2 + >>> env_to_verbosity({'CYLC_DEBUG': 'TRUE'}) + 2 + + """ + return ( + 2 if env.get('CYLC_DEBUG', '').lower() == 'true' + else 1 if env.get('CYLC_VERBOSE', '').lower() == 'true' + else 0 + ) diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index f6847dcef59..5c070472025 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -21,7 +21,6 @@ from time import sleep from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Union -from graphql.execution import ExecutionResult from graphql.execution.executors.asyncio import AsyncioExecutor import zmq from zmq.auth.thread import ThreadAuthenticator @@ -41,6 +40,7 @@ if TYPE_CHECKING: from cylc.flow.scheduler import Scheduler + from graphql.execution import ExecutionResult # maps server methods to the protobuf message (for client/UIS import) @@ -368,7 +368,7 @@ def graphql( object: Execution result, or a list with errors. """ try: - executed: ExecutionResult = schema.execute( + executed: 'ExecutionResult' = schema.execute( request_string, variable_values=variables, context_value={ diff --git a/cylc/flow/option_parsers.py b/cylc/flow/option_parsers.py index cb21f5e8bcf..ee84a477da7 100644 --- a/cylc/flow/option_parsers.py +++ b/cylc/flow/option_parsers.py @@ -33,7 +33,7 @@ import sys from textwrap import dedent -from typing import Any, Dict, Optional, List, Tuple, Union +from typing import Any, Dict, Optional, List, Tuple from cylc.flow import LOG from cylc.flow.terminal import supports_color, DIM @@ -42,6 +42,10 @@ CylcLogFormatter, setup_segregated_log_streams, ) +from cylc.flow.log_level import ( + env_to_verbosity, + verbosity_to_log_level +) WORKFLOW_ID_ARG_DOC = ('WORKFLOW', 'Workflow ID') WORKFLOW_ID_MULTI_ARG_DOC = ('WORKFLOW ...', 'Workflow ID(s)') @@ -177,99 +181,6 @@ def format_help_headings(string): ) -def verbosity_to_log_level(verb: int) -> int: - """Convert Cylc verbosity to log severity level.""" - if verb < 0: - return logging.WARNING - if verb > 0: - return logging.DEBUG - return logging.INFO - - -def log_level_to_verbosity(lvl: int) -> int: - """Convert log severity level to Cylc verbosity. - - Examples: - >>> log_level_to_verbosity(logging.NOTSET) - 2 - >>> log_level_to_verbosity(logging.DEBUG) - 1 - >>> log_level_to_verbosity(logging.INFO) - 0 - >>> log_level_to_verbosity(logging.WARNING) - -1 - >>> log_level_to_verbosity(logging.ERROR) - -1 - """ - if lvl < logging.DEBUG: - return 2 - if lvl < logging.INFO: - return 1 - if lvl == logging.INFO: - return 0 - return -1 - - -def verbosity_to_opts(verb: int) -> List[str]: - """Convert Cylc verbosity to the CLI opts required to replicate it. - - Examples: - >>> verbosity_to_opts(0) - [] - >>> verbosity_to_opts(-2) - ['-q', '-q'] - >>> verbosity_to_opts(2) - ['-v', '-v'] - - """ - return [ - '-q' - for _ in range(verb, 0) - ] + [ - '-v' - for _ in range(0, verb) - ] - - -def verbosity_to_env(verb: int) -> Dict[str, str]: - """Convert Cylc verbosity to the env vars required to replicate it. - - Examples: - >>> verbosity_to_env(0) - {'CYLC_VERBOSE': 'false', 'CYLC_DEBUG': 'false'} - >>> verbosity_to_env(1) - {'CYLC_VERBOSE': 'true', 'CYLC_DEBUG': 'false'} - >>> verbosity_to_env(2) - {'CYLC_VERBOSE': 'true', 'CYLC_DEBUG': 'true'} - - """ - return { - 'CYLC_VERBOSE': str((verb > 0)).lower(), - 'CYLC_DEBUG': str((verb > 1)).lower(), - } - - -def env_to_verbosity(env: Union[Dict, os._Environ]) -> int: - """Extract verbosity from environment variables. - - Examples: - >>> env_to_verbosity({}) - 0 - >>> env_to_verbosity({'CYLC_VERBOSE': 'true'}) - 1 - >>> env_to_verbosity({'CYLC_DEBUG': 'true'}) - 2 - >>> env_to_verbosity({'CYLC_DEBUG': 'TRUE'}) - 2 - - """ - return ( - 2 if env.get('CYLC_DEBUG', '').lower() == 'true' - else 1 if env.get('CYLC_VERBOSE', '').lower() == 'true' - else 0 - ) - - class CylcOption(Option): """Optparse option which adds a decrement action.""" diff --git a/cylc/flow/parsec/fileparse.py b/cylc/flow/parsec/fileparse.py index 8ac0a20a327..f4f8d723f0b 100644 --- a/cylc/flow/parsec/fileparse.py +++ b/cylc/flow/parsec/fileparse.py @@ -31,7 +31,6 @@ """ import os -from optparse import Values from pathlib import Path import re import sys @@ -50,6 +49,9 @@ from cylc.flow.workflow_files import ( get_workflow_source_dir, check_flow_file) +if t.TYPE_CHECKING: + from optparse import Values + # heading/sections can contain commas (namespace name lists) and any # regex pattern characters (this was for pre cylc-6 satellite tasks). diff --git a/cylc/flow/remote.py b/cylc/flow/remote.py index 62cd77fd9af..fd51934af11 100644 --- a/cylc/flow/remote.py +++ b/cylc/flow/remote.py @@ -32,7 +32,7 @@ import cylc.flow.flags from cylc.flow import __version__ as CYLC_VERSION, LOG -from cylc.flow.option_parsers import verbosity_to_opts +from cylc.flow.log_level import verbosity_to_opts from cylc.flow.platforms import get_platform, get_host_from_platform from cylc.flow.util import format_cmd diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 7449e7d274a..e5c3fc7c01a 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -18,7 +18,6 @@ import asyncio from contextlib import suppress from collections import deque -from optparse import Values import os import inspect from pathlib import Path @@ -84,16 +83,15 @@ patch_log_level ) from cylc.flow.timer import Timer -from cylc.flow.network import API -from cylc.flow.network.authentication import key_housekeeping -from cylc.flow.network.resolvers import TaskMsg -from cylc.flow.network.schema import WorkflowStopMode -from cylc.flow.network.server import WorkflowRuntimeServer -from cylc.flow.option_parsers import ( +from cylc.flow.log_level import ( log_level_to_verbosity, verbosity_to_env, verbosity_to_opts, ) +from cylc.flow.network import API +from cylc.flow.network.authentication import key_housekeeping +from cylc.flow.network.schema import WorkflowStopMode +from cylc.flow.network.server import WorkflowRuntimeServer from cylc.flow.parsec.exceptions import ParsecError from cylc.flow.parsec.OrderedDict import DictTree from cylc.flow.parsec.validate import DurationFloat @@ -155,6 +153,8 @@ # FROM: Python 3.7 # TO: Python 3.8 from typing_extensions import Literal + from optparse import Values + from cylc.flow.network.resolvers import TaskMsg class SchedulerStop(CylcError): @@ -228,7 +228,7 @@ class Scheduler: # configuration config: WorkflowConfig # flow config - options: Values + options: 'Values' cylc_config: DictTree # [scheduler] config template_vars: Dict[str, Any] @@ -271,7 +271,7 @@ class Scheduler: time_next_kill: Optional[float] = None - def __init__(self, id_: str, options: Values) -> None: + def __init__(self, id_: str, options: 'Values') -> None: # flow information self.workflow = id_ self.workflow_name = get_workflow_name_from_id(self.workflow) @@ -862,7 +862,7 @@ def _load_task_run_times(self, row_idx, row): def process_queued_task_messages(self) -> None: """Handle incoming task messages for each task proxy.""" - messages: Dict[str, List[Tuple[Optional[int], TaskMsg]]] = {} + messages: 'Dict[str, List[Tuple[Optional[int], TaskMsg]]]' = {} while self.message_queue.qsize(): try: task_msg = self.message_queue.get(block=False) diff --git a/cylc/flow/scripts/cat_log.py b/cylc/flow/scripts/cat_log.py index e8003d66da9..59142beb897 100755 --- a/cylc/flow/scripts/cat_log.py +++ b/cylc/flow/scripts/cat_log.py @@ -74,10 +74,10 @@ import cylc.flow.flags from cylc.flow.hostuserutil import is_remote_platform from cylc.flow.id_cli import parse_id +from cylc.flow.log_level import verbosity_to_opts from cylc.flow.option_parsers import ( ID_MULTI_ARG_DOC, CylcOptionParser as COP, - verbosity_to_opts, ) from cylc.flow.pathutil import ( expand_path, diff --git a/cylc/flow/scripts/kill.py b/cylc/flow/scripts/kill.py index 40f0efd9b9b..2af4c34200e 100755 --- a/cylc/flow/scripts/kill.py +++ b/cylc/flow/scripts/kill.py @@ -34,7 +34,6 @@ from functools import partial from typing import TYPE_CHECKING -from cylc.flow.id import Tokens from cylc.flow.network.client_factory import get_client from cylc.flow.network.multi import call_multi from cylc.flow.option_parsers import ( @@ -45,6 +44,7 @@ if TYPE_CHECKING: from optparse import Values + from cylc.flow.id import Tokens MUTATION = ''' @@ -62,7 +62,7 @@ ''' -def get_option_parser() -> COP: +def get_option_parser() -> 'COP': parser = COP( __doc__, comms=True, @@ -74,7 +74,7 @@ def get_option_parser() -> COP: return parser -async def run(options: 'Values', workflow_id: str, *tokens_list: Tokens): +async def run(options: 'Values', workflow_id: str, *tokens_list: 'Tokens'): pclient = get_client(workflow_id, timeout=options.comms_timeout) mutation_kwargs = { @@ -92,7 +92,7 @@ async def run(options: 'Values', workflow_id: str, *tokens_list: Tokens): @cli_function(get_option_parser) -def main(parser: COP, options: 'Values', *ids: str): +def main(parser: 'COP', options: 'Values', *ids: str): """CLI of "cylc kill".""" call_multi( partial(run, options), diff --git a/cylc/flow/scripts/lint.py b/cylc/flow/scripts/lint.py index 681870dac76..951e6aa3643 100755 --- a/cylc/flow/scripts/lint.py +++ b/cylc/flow/scripts/lint.py @@ -39,7 +39,6 @@ """ from colorama import Fore import functools -from optparse import Values from pathlib import Path import re import sys @@ -59,7 +58,7 @@ loads as toml_loads, TOMLDecodeError, ) -from typing import Callable, Dict, Iterator, List, Union +from typing import Callable, Dict, Iterator, List, Union, TYPE_CHECKING from cylc.flow import LOG from cylc.flow.exceptions import CylcError @@ -73,6 +72,9 @@ from cylc.flow.scripts.cylc import DEAD_ENDS from cylc.flow.terminal import cli_function +if TYPE_CHECKING: + from optparse import Values + DEPRECATED_ENV_VARS = { 'CYLC_SUITE_HOST': 'CYLC_WORKFLOW_HOST', 'CYLC_SUITE_OWNER': 'CYLC_WORKFLOW_OWNER', diff --git a/cylc/flow/scripts/set_outputs.py b/cylc/flow/scripts/set_outputs.py index 9ca3b38a7ba..b45ddbff268 100755 --- a/cylc/flow/scripts/set_outputs.py +++ b/cylc/flow/scripts/set_outputs.py @@ -48,7 +48,7 @@ """ from functools import partial -from optparse import Values +from typing import TYPE_CHECKING from cylc.flow.network.client_factory import get_client from cylc.flow.network.multi import call_multi @@ -58,6 +58,9 @@ ) from cylc.flow.terminal import cli_function +if TYPE_CHECKING: + from optparse import Values + MUTATION = ''' mutation ( $wFlows: [WorkflowID]!, diff --git a/cylc/flow/scripts/set_verbosity.py b/cylc/flow/scripts/set_verbosity.py index a87655d2267..aa40d01345c 100755 --- a/cylc/flow/scripts/set_verbosity.py +++ b/cylc/flow/scripts/set_verbosity.py @@ -26,7 +26,7 @@ """ from functools import partial -from optparse import Values +from typing import TYPE_CHECKING from cylc.flow import LOG_LEVELS from cylc.flow.exceptions import InputError @@ -38,6 +38,10 @@ ) from cylc.flow.terminal import cli_function +if TYPE_CHECKING: + from optparse import Values + + MUTATION = ''' mutation ( $wFlows: [WorkflowID]!, diff --git a/cylc/flow/scripts/show.py b/cylc/flow/scripts/show.py index 2ff427d12fb..5a36eaf8e0d 100755 --- a/cylc/flow/scripts/show.py +++ b/cylc/flow/scripts/show.py @@ -38,9 +38,8 @@ import asyncio import json -from optparse import Values import sys -from typing import Dict +from typing import Dict, TYPE_CHECKING from ansimarkup import ansiprint @@ -59,6 +58,10 @@ from cylc.flow.terminal import cli_function +if TYPE_CHECKING: + from optparse import Values + + WORKFLOW_META_QUERY = ''' query ($wFlows: [ID]!) { workflows (ids: $wFlows, stripNull: false) { diff --git a/cylc/flow/scripts/validate.py b/cylc/flow/scripts/validate.py index 47a29897d60..7cfd42c2138 100755 --- a/cylc/flow/scripts/validate.py +++ b/cylc/flow/scripts/validate.py @@ -28,8 +28,8 @@ import asyncio from ansimarkup import parse as cparse from copy import deepcopy -from optparse import Values import sys +from typing import TYPE_CHECKING from cylc.flow import LOG, __version__ as CYLC_VERSION from cylc.flow.config import WorkflowConfig @@ -56,6 +56,9 @@ from cylc.flow.terminal import cli_function from cylc.flow.scheduler_cli import RUN_MODE +if TYPE_CHECKING: + from cylc.flow.option_parsers import Values + VALIDATE_RUN_MODE = deepcopy(RUN_MODE) VALIDATE_RUN_MODE.sources = {'validate'} diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 9a8a19ba18a..d4d9130ee27 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -581,7 +581,7 @@ def process_message( if submit_num is None: submit_num = itask.submit_num if isinstance(severity, int): - severity = cast(str, getLevelName(severity)) + severity = cast('str', getLevelName(severity)) lseverity = str(severity).lower() # Any message represents activity. @@ -811,7 +811,7 @@ def _process_message_check( ) return False - severity = cast(int, LOG_LEVELS.get(severity, INFO)) + severity = cast('int', LOG_LEVELS.get(severity, INFO)) # Demote log level to DEBUG if this is a message that duplicates what # gets logged by itask state change anyway (and not manual poll) if severity > DEBUG and flag != self.FLAG_POLLED and message in { diff --git a/cylc/flow/task_id.py b/cylc/flow/task_id.py index 821d637c2ff..4a17a59548d 100644 --- a/cylc/flow/task_id.py +++ b/cylc/flow/task_id.py @@ -17,13 +17,15 @@ import re -from typing import Optional +from typing import Optional, TYPE_CHECKING -from cylc.flow.cycling import PointBase from cylc.flow.cycling.loader import get_point, standardise_point_string from cylc.flow.exceptions import PointParsingError from cylc.flow.id import Tokens +if TYPE_CHECKING: + from cylc.flow.cycling import PointBase + _TASK_NAME_PREFIX = r'\w' _TASK_NAME_CHARACTERS = [r'\w', r'\-', '+', '%', '@'] @@ -94,7 +96,10 @@ def get_standardised_point_string(cls, point_string): return point_string @classmethod - def get_standardised_point(cls, point_string: str) -> Optional[PointBase]: + def get_standardised_point( + cls, + point_string: str, + ) -> 'Optional[PointBase]': """Return a standardised point.""" return get_point(cls.get_standardised_point_string(point_string)) diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 271049126fd..9b2fc59f5c8 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -26,7 +26,6 @@ from metomi.isodatetime.timezone import get_local_time_zone from cylc.flow import LOG -from cylc.flow.id import Tokens from cylc.flow.platforms import get_platform from cylc.flow.task_action_timer import TimerFlags from cylc.flow.task_state import TaskState, TASK_STATUS_WAITING @@ -42,6 +41,7 @@ from cylc.flow.cycling import PointBase from cylc.flow.task_action_timer import TaskActionTimer from cylc.flow.taskdef import TaskDef + from cylc.flow.id import Tokens class TaskProxy: diff --git a/cylc/flow/task_queues/__init__.py b/cylc/flow/task_queues/__init__.py index 0e0c8bce90b..32983789632 100644 --- a/cylc/flow/task_queues/__init__.py +++ b/cylc/flow/task_queues/__init__.py @@ -16,10 +16,11 @@ """Define the Cylc task queue management API.""" -from typing import List, Dict, Counter, Any +from typing import List, Dict, Counter, Any, TYPE_CHECKING from abc import ABCMeta, abstractmethod -from cylc.flow.task_proxy import TaskProxy +if TYPE_CHECKING: + from cylc.flow.task_proxy import TaskProxy class TaskQueueManagerBase(metaclass=ABCMeta): @@ -42,22 +43,22 @@ def __init__(self, pass @abstractmethod - def push_task(self, itask: TaskProxy) -> None: + def push_task(self, itask: 'TaskProxy') -> None: """Queue the given task.""" pass @abstractmethod - def release_tasks(self, active: Counter[str]) -> List[TaskProxy]: + def release_tasks(self, active: Counter[str]) -> 'List[TaskProxy]': """Release tasks, given current active task counts.""" pass @abstractmethod - def remove_task(self, itask: TaskProxy) -> None: + def remove_task(self, itask: 'TaskProxy') -> None: """Remove a task from the queueing system.""" pass @abstractmethod - def force_release_task(self, itask: TaskProxy) -> None: + def force_release_task(self, itask: 'TaskProxy') -> None: """Remove a task from whichever queue it belongs to. To be returned when release_tasks() is next called. diff --git a/cylc/flow/task_queues/independent.py b/cylc/flow/task_queues/independent.py index d423a5f1702..d0e82995898 100644 --- a/cylc/flow/task_queues/independent.py +++ b/cylc/flow/task_queues/independent.py @@ -18,11 +18,13 @@ from collections import deque from contextlib import suppress -from typing import List, Set, Dict, Counter, Any +from typing import List, Set, Dict, Counter, Any, TYPE_CHECKING -from cylc.flow.task_proxy import TaskProxy from cylc.flow.task_queues import TaskQueueManagerBase +if TYPE_CHECKING: + from cylc.flow.task_proxy import TaskProxy + class LimitedTaskQueue: """One task queue with group members and active limit.""" @@ -33,16 +35,16 @@ def __init__(self, limit: int, members: Set[str]) -> None: self.members = members # member task names self.deque: deque = deque() - def push_task(self, itask: TaskProxy) -> None: + def push_task(self, itask: 'TaskProxy') -> None: """Queue task if in my membership list.""" if itask.tdef.name in self.members: self.deque.appendleft(itask) - def release(self, active: Counter[str]) -> List[TaskProxy]: + def release(self, active: Counter[str]) -> 'List[TaskProxy]': """Release tasks if below the active limit.""" # The "active" argument counts active tasks by name. - released: List[TaskProxy] = [] - held: List[TaskProxy] = [] + released: 'List[TaskProxy]' = [] + held: 'List[TaskProxy]' = [] n_active: int = 0 for mem in self.members: n_active += active[mem] @@ -62,7 +64,7 @@ def release(self, active: Counter[str]) -> List[TaskProxy]: self.deque.appendleft(itask) return released - def remove(self, itask: TaskProxy) -> bool: + def remove(self, itask: 'TaskProxy') -> bool: """Remove a single task from queue, return True if removed.""" try: self.deque.remove(itask) @@ -111,16 +113,16 @@ def __init__(self, config["limit"], config["members"] ) - self.force_released: Set[TaskProxy] = set() + self.force_released: 'Set[TaskProxy]' = set() - def push_task(self, itask: TaskProxy) -> None: + def push_task(self, itask: 'TaskProxy') -> None: """Push a task to the appropriate queue.""" for queue in self.queues.values(): queue.push_task(itask) - def release_tasks(self, active: Counter[str]) -> List[TaskProxy]: + def release_tasks(self, active: Counter[str]) -> 'List[TaskProxy]': """Release tasks up to the queue limits.""" - released: List[TaskProxy] = [] + released: 'List[TaskProxy]' = [] for queue in self.queues.values(): released += queue.release(active) if self.force_released: @@ -128,13 +130,13 @@ def release_tasks(self, active: Counter[str]) -> List[TaskProxy]: self.force_released = set() return released - def remove_task(self, itask: TaskProxy) -> None: + def remove_task(self, itask: 'TaskProxy') -> None: """Remove a task from whichever queue it belongs to.""" for queue in self.queues.values(): if queue.remove(itask): break - def force_release_task(self, itask: TaskProxy) -> None: + def force_release_task(self, itask: 'TaskProxy') -> None: """Remove a task from whichever queue it belongs to. To be returned when release_tasks() is next called. diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index 541bc1265a0..9ef4d465bf2 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -25,7 +25,6 @@ from collections import deque from contextlib import suppress from pathlib import Path -from cylc.flow.option_parsers import verbosity_to_opts import os from shlex import quote import re @@ -33,8 +32,15 @@ import tarfile from time import sleep, time from typing import ( - Any, Deque, Dict, TYPE_CHECKING, List, - NamedTuple, Optional, Set, Tuple + Any, + Deque, + Dict, + List, + NamedTuple, + Optional, + Set, + TYPE_CHECKING, + Tuple, ) from cylc.flow import LOG @@ -42,6 +48,9 @@ PlatformError, PlatformLookupError, NoHostsError, NoPlatformsError ) import cylc.flow.flags +from cylc.flow.hostuserutil import is_remote_host +from cylc.flow.log_level import verbosity_to_opts +from cylc.flow.loggingutil import get_next_log_number, get_sorted_logs_by_time from cylc.flow.network.client_factory import CommsMeth from cylc.flow.pathutil import ( get_dirs_to_symlink, @@ -70,9 +79,6 @@ get_workflow_srv_dir, ) -from cylc.flow.loggingutil import get_next_log_number, get_sorted_logs_by_time -from cylc.flow.hostuserutil import is_remote_host - if TYPE_CHECKING: from zmq.auth.thread import ThreadAuthenticator diff --git a/cylc/flow/templatevars.py b/cylc/flow/templatevars.py index 7474761e226..e3d0d8f8e02 100644 --- a/cylc/flow/templatevars.py +++ b/cylc/flow/templatevars.py @@ -16,8 +16,7 @@ """Load custom variables for template processor.""" from ast import literal_eval -from optparse import Values -from typing import Any, Dict, List, Optional, Set, Union +from typing import Any, Dict, List, Optional, Set, Union, TYPE_CHECKING from cylc.flow import LOG from cylc.flow.exceptions import InputError @@ -25,7 +24,9 @@ from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager from cylc.flow.workflow_files import WorkflowFiles -from pathlib import Path +if TYPE_CHECKING: + from optparse import Values + from pathlib import Path OVERWRITE_WARNING = ( @@ -111,7 +112,7 @@ def parse_string_list(stringlist: str) -> List: def load_template_vars( template_vars: Optional[List[str]] = None, - template_vars_file: Union[Path, str, None] = None, + template_vars_file: 'Union[Path, str, None]' = None, templatevars_lists: Optional[List[str]] = None ) -> Dict[str, Any]: """Load template variables from key=value strings.""" @@ -182,7 +183,7 @@ def load_template_vars( return res -def get_template_vars(options: Values) -> Dict[str, Any]: +def get_template_vars(options: 'Values') -> Dict[str, Any]: """Convienence wrapper for ``load_template_vars``. Args: diff --git a/cylc/flow/terminal.py b/cylc/flow/terminal.py index 48e95f2df00..bc45f6d65d3 100644 --- a/cylc/flow/terminal.py +++ b/cylc/flow/terminal.py @@ -20,12 +20,11 @@ import inspect import json import logging -from optparse import OptionParser import os from subprocess import PIPE, Popen # nosec import sys from textwrap import wrap -from typing import Any, Callable, Optional +from typing import Any, Callable, Optional, TYPE_CHECKING from ansimarkup import parse as cparse from colorama import init as color_init @@ -36,6 +35,9 @@ from cylc.flow.loggingutil import CylcLogFormatter from cylc.flow.parsec.exceptions import ParsecError +if TYPE_CHECKING: + from optparse import OptionParser + # CLI exception message format EXC_EXIT = cparse('{name}: {exc}') @@ -184,7 +186,7 @@ def parse_dirty_json(stdout): def cli_function( - parser_function: Optional[Callable[..., OptionParser]] = None, + parser_function: 'Optional[Callable[..., OptionParser]]' = None, **parser_kwargs: Any ): """Decorator for CLI entry points. diff --git a/cylc/flow/workflow_files.py b/cylc/flow/workflow_files.py index 3f1d011a9da..de30bc1e7d2 100644 --- a/cylc/flow/workflow_files.py +++ b/cylc/flow/workflow_files.py @@ -60,10 +60,6 @@ get_workflow_run_dir, make_localhost_symlinks, ) -from cylc.flow.remote import ( - construct_cylc_server_ssh_cmd, -) -from cylc.flow.terminal import parse_dirty_json from cylc.flow.unicode_rules import WorkflowNameValidator from cylc.flow.util import cli_format @@ -382,6 +378,9 @@ def _is_process_running( False """ + from cylc.flow.remote import construct_cylc_server_ssh_cmd + from cylc.flow.terminal import parse_dirty_json + # See if the process is still running or not. metric = f'[["Process", {pid}]]' if is_remote_host(host): diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 4512d97b7c8..c89113641ef 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -20,20 +20,23 @@ import re from copy import deepcopy from time import time -from typing import Any, Dict, List, Optional, Tuple, Callable +from typing import Any, Dict, List, Optional, Tuple, Callable, TYPE_CHECKING from cylc.flow import LOG from cylc.flow.exceptions import XtriggerConfigError import cylc.flow.flags from cylc.flow.hostuserutil import get_user from cylc.flow.xtriggers.wall_clock import wall_clock +from cylc.flow.subprocpool import ( + SubProcPool, + get_func, +) -from cylc.flow.subprocctx import SubFuncContext -from cylc.flow.broadcast_mgr import BroadcastMgr -from cylc.flow.data_store_mgr import DataStoreMgr -from cylc.flow.subprocpool import SubProcPool -from cylc.flow.task_proxy import TaskProxy -from cylc.flow.subprocpool import get_func +if TYPE_CHECKING: + from cylc.flow.broadcast_mgr import BroadcastMgr + from cylc.flow.data_store_mgr import DataStoreMgr + from cylc.flow.subprocctx import SubFuncContext + from cylc.flow.task_proxy import TaskProxy class TemplateVariables(Enum): @@ -195,15 +198,15 @@ class XtriggerManager: def __init__( self, workflow: str, - broadcast_mgr: BroadcastMgr, - data_store_mgr: DataStoreMgr, + broadcast_mgr: 'BroadcastMgr', + data_store_mgr: 'DataStoreMgr', proc_pool: SubProcPool, user: Optional[str] = None, workflow_run_dir: Optional[str] = None, workflow_share_dir: Optional[str] = None, ): # Workflow function and clock triggers by label. - self.functx_map: Dict[str, SubFuncContext] = {} + self.functx_map: 'Dict[str, SubFuncContext]' = {} # When next to call a function, by signature. self.t_next_call: dict = {} # Satisfied triggers and their function results, by signature. @@ -234,7 +237,11 @@ def __init__( self.data_store_mgr = data_store_mgr @staticmethod - def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None: + def validate_xtrigger( + label: str, + fctx: 'SubFuncContext', + fdir: str, + ) -> None: """Validate an Xtrigger function. Args: @@ -305,7 +312,7 @@ def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None: f' {", ".join(t.value for t in deprecated_variables)}' ) - def add_trig(self, label: str, fctx: SubFuncContext, fdir: str) -> None: + def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None: """Add a new xtrigger function. Check the xtrigger function exists here (e.g. during validation). @@ -334,7 +341,7 @@ def load_xtrigger_for_restart(self, row_idx: int, row: Tuple[str, str]): sig, results = row self.sat_xtrig[sig] = json.loads(results) - def _get_xtrigs(self, itask: TaskProxy, unsat_only: bool = False, + def _get_xtrigs(self, itask: 'TaskProxy', unsat_only: bool = False, sigs_only: bool = False): """(Internal helper method.) @@ -361,7 +368,11 @@ def _get_xtrigs(self, itask: TaskProxy, unsat_only: bool = False, res.append((label, sig, ctx, satisfied)) return res - def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext: + def get_xtrig_ctx( + self, + itask: 'TaskProxy', + label: str, + ) -> 'SubFuncContext': """Get a real function context from the template. Args: @@ -411,7 +422,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext: ctx.update_command(self.workflow_run_dir) return ctx - def call_xtriggers_async(self, itask: TaskProxy): + def call_xtriggers_async(self, itask: 'TaskProxy'): """Call itask's xtrigger functions via the process pool... ...if previous call not still in-process and retry period is up. @@ -456,7 +467,7 @@ def call_xtriggers_async(self, itask: TaskProxy): self.active.append(sig) self.proc_pool.put_command(ctx, callback=self.callback) - def housekeep(self, itasks: List[TaskProxy]): + def housekeep(self, itasks: 'List[TaskProxy]'): """Delete satisfied xtriggers no longer needed by any task. Args: @@ -469,7 +480,7 @@ def housekeep(self, itasks: List[TaskProxy]): if sig not in all_xtrig: del self.sat_xtrig[sig] - def callback(self, ctx: SubFuncContext): + def callback(self, ctx: 'SubFuncContext'): """Callback for asynchronous xtrigger functions. Record satisfaction status and function results dict. @@ -494,7 +505,7 @@ def callback(self, ctx: SubFuncContext): def check_xtriggers( self, - itask: TaskProxy, + itask: 'TaskProxy', db_update_func: Callable[[dict], None]) -> bool: """Check if all of itasks' xtriggers have become satisfied. diff --git a/setup.cfg b/setup.cfg index fbac4552f57..6c17d2091ef 100644 --- a/setup.cfg +++ b/setup.cfg @@ -113,6 +113,7 @@ tests = flake8-debugger>=4.0.0 flake8-mutable>=1.2.0 flake8-simplify>=0.14.0 + flake8-type-checking; python_version > "3.7" flake8>=3.0.0 mypy>=0.910 pytest-asyncio>=0.17 diff --git a/tests/unit/test_id_cli.py b/tests/unit/test_id_cli.py index 4f1099b164b..149990a9bb0 100644 --- a/tests/unit/test_id_cli.py +++ b/tests/unit/test_id_cli.py @@ -570,7 +570,7 @@ async def _scan(): # something that looks like scan but doesn't do anything yield - monkeypatch.setattr('cylc.flow.id_cli.scan', _scan) + monkeypatch.setattr('cylc.flow.network.scan.scan', _scan) async def test_expand_workflow_tokens_impl_selector(no_scan):