diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index b3bbbd23d7d..90c9f287669 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -16,19 +16,20 @@ """Cylc scheduler server.""" import asyncio +import os +import sys +import traceback from collections import deque from contextlib import suppress -import os from pathlib import Path from queue import Empty, Queue from shlex import quote from socket import gaierror from subprocess import DEVNULL, PIPE, Popen -import sys from threading import Barrier, Thread from time import sleep, time -import traceback from typing import ( + TYPE_CHECKING, Any, AsyncGenerator, Callable, @@ -38,7 +39,6 @@ NoReturn, Optional, Set, - TYPE_CHECKING, Tuple, Union, ) @@ -46,33 +46,23 @@ import psutil +import cylc.flow.flags from cylc.flow import ( LOG, __version__ as CYLC_VERSION, + commands, main_loop, + workflow_files, ) -from cylc.flow import workflow_files from cylc.flow.broadcast_mgr import BroadcastMgr from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.config import WorkflowConfig -from cylc.flow import commands from cylc.flow.data_store_mgr import DataStoreMgr -from cylc.flow.exceptions import ( - CommandFailedError, - CylcError, - InputError, -) -import cylc.flow.flags +from cylc.flow.exceptions import CommandFailedError, CylcError, InputError from cylc.flow.flow_mgr import FLOW_NEW, FLOW_NONE, FlowMgr -from cylc.flow.host_select import ( - HostSelectException, - select_workflow_host, -) -from cylc.flow.hostuserutil import ( - get_host, - get_user, - is_remote_platform, -) +from cylc.flow.graphnode import GraphNodeParser +from cylc.flow.host_select import HostSelectException, select_workflow_host +from cylc.flow.hostuserutil import get_host, get_user, is_remote_platform from cylc.flow.id import Tokens from cylc.flow.log_level import verbosity_to_env, verbosity_to_opts from cylc.flow.loggingutil import ( @@ -86,8 +76,8 @@ from cylc.flow.network import API from cylc.flow.network.authentication import key_housekeeping from cylc.flow.network.server import WorkflowRuntimeServer -from cylc.flow.parsec.OrderedDict import DictTree from cylc.flow.parsec.exceptions import ParsecError +from cylc.flow.parsec.OrderedDict import DictTree from cylc.flow.parsec.validate import DurationFloat from cylc.flow.pathutil import ( get_workflow_name_from_id, @@ -121,16 +111,15 @@ REMOTE_INIT_FAILED, ) from cylc.flow.task_state import ( - TASK_STATUSES_ACTIVE, - TASK_STATUSES_NEVER_ACTIVE, TASK_STATUS_PREPARING, TASK_STATUS_RUNNING, TASK_STATUS_SUBMITTED, TASK_STATUS_WAITING, + TASK_STATUSES_ACTIVE, + TASK_STATUSES_NEVER_ACTIVE, ) from cylc.flow.taskdef import TaskDef -from cylc.flow.templatevars import eval_var -from cylc.flow.templatevars import get_template_vars +from cylc.flow.templatevars import eval_var, get_template_vars from cylc.flow.timer import Timer from cylc.flow.util import cli_format from cylc.flow.wallclock import ( @@ -143,12 +132,15 @@ from cylc.flow.workflow_status import AutoRestartMode, RunMode, StopMode from cylc.flow.xtrigger_mgr import XtriggerManager + if TYPE_CHECKING: # BACK COMPAT: typing_extensions.Literal # FROM: Python 3.7 # TO: Python 3.8 - from typing_extensions import Literal from optparse import Values + + from typing_extensions import Literal + from cylc.flow.network.resolvers import TaskMsg @@ -408,6 +400,10 @@ async def initialise(self): self.profiler = Profiler(self, self.options.profile_mode) + # Reset graph node parser singleton + # (mainly to prevent state leakage between integration tests): + GraphNodeParser.get_inst().clear() + async def configure(self, params): """Configure the scheduler.