diff --git a/CHANGES.md b/CHANGES.md index 4dfe61f42db..6ac7e4d345c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,35 @@ $ towncrier create ..md --content "Short description" +## __cylc-8.2.4 (Released 2024-01-11)__ + +### 🚀 Enhancements + +[#5772](https://github.com/cylc/cylc-flow/pull/5772) - `cylc lint`: added a check for indentation being 4N spaces. + +[#5838](https://github.com/cylc/cylc-flow/pull/5838) - `cylc lint`: added rule to check for `rose date` usage (should be replaced with `isodatetime`). + +### 🔧 Fixes + +[#5789](https://github.com/cylc/cylc-flow/pull/5789) - Prevent the run mode from being changed on restart. + +[#5801](https://github.com/cylc/cylc-flow/pull/5801) - Fix traceback when using parentheses on right hand side of graph trigger. + +[#5821](https://github.com/cylc/cylc-flow/pull/5821) - Fixed issue where large uncommitted changes could cause `cylc install` to hang. + +[#5841](https://github.com/cylc/cylc-flow/pull/5841) - `cylc lint`: improved handling of S011 to not warn if the `#` is `#$` (e.g. shell base arithmetic). + +[#5885](https://github.com/cylc/cylc-flow/pull/5885) - Fixed bug in using a final cycle point with chained offsets e.g. 'final cycle point = +PT6H+PT1S'. + +[#5893](https://github.com/cylc/cylc-flow/pull/5893) - Fixed bug in computing a time interval-based runahead limit when future triggers are present. + +[#5902](https://github.com/cylc/cylc-flow/pull/5902) - Fixed a bug that prevented unsetting `execution time limit` by broadcast or reload. + +[#5908](https://github.com/cylc/cylc-flow/pull/5908) - Fixed bug causing redundant DB updates when many tasks depend on the same xtrigger. + +[#5909](https://github.com/cylc/cylc-flow/pull/5909) - Fix a bug where Cylc VIP did not remove --workflow-name= from + Cylc play arguments. + ## __cylc-8.2.3 (Released 2023-11-02)__ ### 🔧 Fixes diff --git a/changes.d/5772.feat.md b/changes.d/5772.feat.md deleted file mode 100644 index da0984a82ec..00000000000 --- a/changes.d/5772.feat.md +++ /dev/null @@ -1 +0,0 @@ -`cylc lint`: added a check for indentation being 4N spaces. diff --git a/changes.d/5789.fix.md b/changes.d/5789.fix.md deleted file mode 100644 index 7eda67036e0..00000000000 --- a/changes.d/5789.fix.md +++ /dev/null @@ -1 +0,0 @@ -Stop users changing run modes on restart. diff --git a/changes.d/5801.fix.md b/changes.d/5801.fix.md deleted file mode 100644 index e7fd0584090..00000000000 --- a/changes.d/5801.fix.md +++ /dev/null @@ -1 +0,0 @@ -Fix traceback when using parentheses on right hand side of graph trigger. diff --git a/changes.d/5821.fix.md b/changes.d/5821.fix.md deleted file mode 100644 index 0c6c8b7918d..00000000000 --- a/changes.d/5821.fix.md +++ /dev/null @@ -1 +0,0 @@ -Fixed issue where large uncommitted changes could cause `cylc install` to hang. diff --git a/changes.d/5838.feat.md b/changes.d/5838.feat.md deleted file mode 100644 index 8e9919d3a0f..00000000000 --- a/changes.d/5838.feat.md +++ /dev/null @@ -1 +0,0 @@ -`cylc lint`: added rule to check for `rose date` usage (should be replaced with `isodatetime`). diff --git a/changes.d/5841.fix.md b/changes.d/5841.fix.md deleted file mode 100644 index 4bc41462fca..00000000000 --- a/changes.d/5841.fix.md +++ /dev/null @@ -1 +0,0 @@ -`cylc lint`: improved handling of S011 to not warn if the `#` is `#$` (e.g. shell base arithmetic). diff --git a/changes.d/5885.fix.md b/changes.d/5885.fix.md deleted file mode 100644 index b9071bae612..00000000000 --- a/changes.d/5885.fix.md +++ /dev/null @@ -1 +0,0 @@ -Fixed bug in using a final cycle point with chained offsets e.g. 'final cycle point = +PT6H+PT1S'. \ No newline at end of file diff --git a/changes.d/5893.fix b/changes.d/5893.fix deleted file mode 100644 index 504cd6a649e..00000000000 --- a/changes.d/5893.fix +++ /dev/null @@ -1 +0,0 @@ -Fixed bug in computing a time interval-based runahead limit when future triggers are present. diff --git a/cylc/flow/option_parsers.py b/cylc/flow/option_parsers.py index b83ff45aab9..b04b226c171 100644 --- a/cylc/flow/option_parsers.py +++ b/cylc/flow/option_parsers.py @@ -33,7 +33,7 @@ import sys from textwrap import dedent -from typing import Any, Dict, Optional, List, Tuple +from typing import Any, Dict, Iterable, Optional, List, Tuple from cylc.flow import LOG from cylc.flow.terminal import supports_color, DIM @@ -732,17 +732,33 @@ def combine_options(*args, modify=None): def cleanup_sysargv( - script_name, - workflow_id, - options, - compound_script_opts, - script_opts, - source, -): + script_name: str, + workflow_id: str, + options: 'Values', + compound_script_opts: Iterable['OptionSettings'], + script_opts: Iterable['OptionSettings'], + source: str, +) -> None: """Remove unwanted options from sys.argv Some cylc scripts (notably Cylc Play when it is re-invoked on a scheduler - server) require the correct content in sys.argv. + server) require the correct content in sys.argv: This function + subtracts the unwanted options from sys.argv. + + Args: + script_name: + Name of the target script. For example if we are + using this for the play step of cylc vip then this + will be "play". + workflow_id: + options: + Actual options provided to the compound script. + compound_script_options: + Options available in compound script. + script_options: + Options available in target script. + source: + Source directory. """ # Organize Options by dest. script_opts_by_dest = { @@ -753,30 +769,67 @@ def cleanup_sysargv( x.kwargs.get('dest', x.args[0].strip(DOUBLEDASH)): x for x in compound_script_opts } - # Filter out non-cylc-play options. - args = [i.split('=')[0] for i in sys.argv] - for unwanted_opt in (set(options.__dict__)) - set(script_opts_by_dest): - for arg in compound_opts_by_dest[unwanted_opt].args: - if arg in sys.argv: - index = sys.argv.index(arg) - sys.argv.pop(index) - if ( - compound_opts_by_dest[unwanted_opt].kwargs['action'] - not in ['store_true', 'store_false'] - ): - sys.argv.pop(index) - elif arg in args: - index = args.index(arg) - sys.argv.pop(index) + + # Get a list of unwanted args: + unwanted_compound: List[str] = [] + unwanted_simple: List[str] = [] + for unwanted_dest in (set(options.__dict__)) - set(script_opts_by_dest): + for unwanted_arg in compound_opts_by_dest[unwanted_dest].args: + if ( + compound_opts_by_dest[unwanted_dest].kwargs.get('action', None) + in ['store_true', 'store_false'] + ): + unwanted_simple.append(unwanted_arg) + else: + unwanted_compound.append(unwanted_arg) + + new_args = filter_sysargv(sys.argv, unwanted_simple, unwanted_compound) # replace compound script name: - sys.argv[1] = script_name + new_args[1] = script_name # replace source path with workflow ID. if str(source) in sys.argv: - sys.argv.remove(str(source)) + new_args.remove(str(source)) if workflow_id not in sys.argv: - sys.argv.append(workflow_id) + new_args.append(workflow_id) + + sys.argv = new_args + + +def filter_sysargv( + sysargs, unwanted_simple: List, unwanted_compound: List +) -> List: + """Create a copy of sys.argv without unwanted arguments: + + Cases: + >>> this = filter_sysargv + >>> this(['--foo', 'expects-a-value', '--bar'], [], ['--foo']) + ['--bar'] + >>> this(['--foo=expects-a-value', '--bar'], [], ['--foo']) + ['--bar'] + >>> this(['--foo', '--bar'], ['--foo'], []) + ['--bar'] + """ + pop_next: bool = False + new_args: List = [] + for this_arg in sysargs: + parts = this_arg.split('=', 1) + if pop_next: + pop_next = False + continue + elif parts[0] in unwanted_compound: + # Case --foo=value or --foo value + if len(parts) == 1: + # --foo value + pop_next = True + continue + elif parts[0] in unwanted_simple: + # Case --foo does not expect a value: + continue + else: + new_args.append(this_arg) + return new_args def log_subcommand(*args): diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py index 84360083c5c..256da3b3129 100644 --- a/cylc/flow/rundb.py +++ b/cylc/flow/rundb.py @@ -298,6 +298,8 @@ class CylcWorkflowDAO: ["prereq_output", {"is_primary_key": True}], ["satisfied"], ], + # The xtriggers table holds the function signature and result of + # already-satisfied (the scheduler no longer needs to call them). TABLE_XTRIGGERS: [ ["signature", {"is_primary_key": True}], ["results"], diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index fb5d858d7e0..0e02f9384d9 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -378,6 +378,7 @@ async def initialise(self): self.workflow, user=self.owner, broadcast_mgr=self.broadcast_mgr, + workflow_db_mgr=self.workflow_db_mgr, data_store_mgr=self.data_store_mgr, proc_pool=self.proc_pool, workflow_run_dir=self.workflow_run_dir, @@ -1706,14 +1707,8 @@ async def main_loop(self) -> None: await self.process_command_queue() self.proc_pool.process() - # Tasks in the main pool that are waiting but not queued must be - # waiting on external dependencies, i.e. xtriggers or ext_triggers. - # For these tasks, call any unsatisfied xtrigger functions, and - # queue tasks that have become ready. (Tasks do not appear in the - # main pool at all until all other-task deps are satisfied, and are - # queued immediately on release from runahead limiting if they are - # not waiting on external deps). - housekeep_xtriggers = False + # Unqueued tasks with satisfied prerequisites must be waiting on + # xtriggers or ext_triggers. Check these and queue tasks if ready. for itask in self.pool.get_tasks(): if ( not itask.state(TASK_STATUS_WAITING) @@ -1726,28 +1721,19 @@ async def main_loop(self) -> None: itask.state.xtriggers and not itask.state.xtriggers_all_satisfied() ): - # Call unsatisfied xtriggers if not already in-process. - # Results are returned asynchronously. self.xtrigger_mgr.call_xtriggers_async(itask) - # Check for satisfied xtriggers, and queue if ready. - if self.xtrigger_mgr.check_xtriggers( - itask, self.workflow_db_mgr.put_xtriggers): - housekeep_xtriggers = True - if all(itask.is_ready_to_run()): - self.pool.queue_task(itask) - - # Check for satisfied ext_triggers, and queue if ready. + if ( itask.state.external_triggers and not itask.state.external_triggers_all_satisfied() - and self.broadcast_mgr.check_ext_triggers( - itask, self.ext_trigger_queue) - and all(itask.is_ready_to_run()) ): + self.broadcast_mgr.check_ext_triggers( + itask, self.ext_trigger_queue) + + if all(itask.is_ready_to_run()): self.pool.queue_task(itask) - if housekeep_xtriggers: - # (Could do this periodically?) + if self.xtrigger_mgr.do_housekeeping: self.xtrigger_mgr.housekeep(self.pool.get_tasks()) self.pool.set_expired_tasks() diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index d6bc59fb255..6651488b4b9 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -35,7 +35,7 @@ ) from shutil import rmtree from time import time -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Any, Union, Optional from cylc.flow import LOG from cylc.flow.job_runner_mgr import JobPollContext @@ -1262,10 +1262,11 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig): itask.submit_num] = itask.platform['name'] itask.summary['job_runner_name'] = itask.platform['job runner'] - with suppress(TypeError): - itask.summary[self.KEY_EXECUTE_TIME_LIMIT] = float( - rtconfig['execution time limit'] - ) + + # None is an allowed non-float number for Execution time limit. + itask.summary[ + self.KEY_EXECUTE_TIME_LIMIT + ] = self.get_execution_time_limit(rtconfig['execution time limit']) # Location of job file, etc self._create_job_log_path(workflow, itask) @@ -1281,6 +1282,30 @@ def _prep_submit_task_job_impl(self, workflow, itask, rtconfig): job_d=job_d ) + @staticmethod + def get_execution_time_limit( + config_execution_time_limit: Any + ) -> Union[None, float]: + """Get execution time limit from config and process it. + + If the etl from the config is a Falsy then return None. + Otherwise try and parse value as float. + + Examples: + >>> from pytest import raises + >>> this = TaskJobManager.get_execution_time_limit + + >>> this(None) + >>> this("54") + 54.0 + >>> this({}) + >>> with raises(ValueError): + ... this('🇳🇿') + """ + if config_execution_time_limit: + return float(config_execution_time_limit) + return None + def get_job_conf( self, workflow, diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py index e9402ba058b..de61d0de733 100644 --- a/cylc/flow/workflow_db_mgr.py +++ b/cylc/flow/workflow_db_mgr.py @@ -405,7 +405,6 @@ def put_task_event_timers(self, task_events_mgr): def put_xtriggers(self, sat_xtrig): """Put statements to update external triggers table.""" - self.db_deletes_map[self.TABLE_XTRIGGERS].append({}) for sig, res in sat_xtrig.items(): self.db_inserts_map[self.TABLE_XTRIGGERS].append({ "signature": sig, diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 50eede72a83..6ced0023a56 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -20,23 +20,22 @@ import re from copy import deepcopy from time import time -from typing import Any, Dict, List, Optional, Tuple, Callable, TYPE_CHECKING +from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING from cylc.flow import LOG from cylc.flow.exceptions import XtriggerConfigError import cylc.flow.flags from cylc.flow.hostuserutil import get_user +from cylc.flow.subprocpool import get_func from cylc.flow.xtriggers.wall_clock import wall_clock -from cylc.flow.subprocpool import ( - SubProcPool, - get_func, -) if TYPE_CHECKING: from cylc.flow.broadcast_mgr import BroadcastMgr from cylc.flow.data_store_mgr import DataStoreMgr from cylc.flow.subprocctx import SubFuncContext + from cylc.flow.subprocpool import SubProcPool from cylc.flow.task_proxy import TaskProxy + from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager class TemplateVariables(Enum): @@ -188,6 +187,7 @@ class XtriggerManager: Args: workflow: workflow name user: workflow owner + workflow_db_mgr: the DB Manager broadcast_mgr: the Broadcast Manager proc_pool: pool of Subprocesses workflow_run_dir: workflow run directory @@ -199,8 +199,9 @@ def __init__( self, workflow: str, broadcast_mgr: 'BroadcastMgr', + workflow_db_mgr: 'WorkflowDatabaseManager', data_store_mgr: 'DataStoreMgr', - proc_pool: SubProcPool, + proc_pool: 'SubProcPool', user: Optional[str] = None, workflow_run_dir: Optional[str] = None, workflow_share_dir: Optional[str] = None, @@ -233,8 +234,10 @@ def __init__( } self.proc_pool = proc_pool + self.workflow_db_mgr = workflow_db_mgr self.broadcast_mgr = broadcast_mgr self.data_store_mgr = data_store_mgr + self.do_housekeeping = False @staticmethod def validate_xtrigger( @@ -432,16 +435,23 @@ def call_xtriggers_async(self, itask: 'TaskProxy'): itask: task proxy to check. """ for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True): + # Special case: quick synchronous clock check: if sig.startswith("wall_clock"): - # Special case: quick synchronous clock check. - if wall_clock(*ctx.func_args, **ctx.func_kwargs): + if sig in self.sat_xtrig: + # Already satisfied, just update the task + itask.state.xtriggers[label] = True + elif wall_clock(*ctx.func_args, **ctx.func_kwargs): + # Newly satisfied itask.state.xtriggers[label] = True self.sat_xtrig[sig] = {} self.data_store_mgr.delta_task_xtrigger(sig, True) + self.workflow_db_mgr.put_xtriggers({sig: {}}) LOG.info('xtrigger satisfied: %s = %s', label, sig) + self.do_housekeeping = True continue # General case: potentially slow asynchronous function call. if sig in self.sat_xtrig: + # Already satisfied, just update the task if not itask.state.xtriggers[label]: itask.state.xtriggers[label] = True res = {} @@ -456,6 +466,8 @@ def call_xtriggers_async(self, itask: 'TaskProxy'): xtrigger_env ) continue + + # Call the function to check the unsatisfied xtrigger. if sig in self.active: # Already waiting on this result. continue @@ -468,8 +480,10 @@ def call_xtriggers_async(self, itask: 'TaskProxy'): self.active.append(sig) self.proc_pool.put_command(ctx, callback=self.callback) - def housekeep(self, itasks: 'List[TaskProxy]'): - """Delete satisfied xtriggers no longer needed by any task. + def housekeep(self, itasks): + """Forget satisfied xtriggers no longer needed by any task. + + Check self.do_housekeeping before calling this method. Args: itasks: list of all task proxies. @@ -480,6 +494,7 @@ def housekeep(self, itasks: 'List[TaskProxy]'): for sig in list(self.sat_xtrig): if sig not in all_xtrig: del self.sat_xtrig[sig] + self.do_housekeeping = False def callback(self, ctx: 'SubFuncContext'): """Callback for asynchronous xtrigger functions. @@ -500,23 +515,9 @@ def callback(self, ctx: 'SubFuncContext'): return LOG.debug('%s: returned %s', sig, results) if satisfied: + # Newly satisfied self.data_store_mgr.delta_task_xtrigger(sig, True) + self.workflow_db_mgr.put_xtriggers({sig: results}) LOG.info('xtrigger satisfied: %s = %s', ctx.label, sig) self.sat_xtrig[sig] = results - - def check_xtriggers( - self, - itask: 'TaskProxy', - db_update_func: Callable[[dict], None]) -> bool: - """Check if all of itasks' xtriggers have become satisfied. - - Return True if satisfied, else False - - Args: - itasks: task proxies to check - db_update_func: method to update xtriggers in the DB - """ - if itask.state.xtriggers_all_satisfied(): - db_update_func(self.sat_xtrig) - return True - return False + self.do_housekeeping = True diff --git a/setup.cfg b/setup.cfg index d723d2ed361..7db9eb8bf4e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -121,6 +121,7 @@ tests = pytest-asyncio>=0.17,!=0.23.* pytest-cov>=2.8.0 pytest-xdist>=2 + pytest-mock>=3.7 pytest>=6 testfixtures>=6.11.0 towncrier>=23 diff --git a/tests/functional/xtriggers/02-persistence/flow.cylc b/tests/functional/xtriggers/02-persistence/flow.cylc index 29ba08712a9..f6d004316fb 100644 --- a/tests/functional/xtriggers/02-persistence/flow.cylc +++ b/tests/functional/xtriggers/02-persistence/flow.cylc @@ -3,6 +3,7 @@ [scheduling] initial cycle point = 2010 final cycle point = 2011 + runahead limit = P0 [[xtriggers]] x1 = faker(name="bob") [[graph]] diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py index 297ad889df5..b085162a1da 100644 --- a/tests/integration/test_task_job_mgr.py +++ b/tests/integration/test_task_job_mgr.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from contextlib import suppress import logging from typing import Any as Fixture @@ -128,3 +129,60 @@ async def test__run_job_cmd_logs_platform_lookup_fail( warning = caplog.records[-1] assert warning.levelname == 'ERROR' assert 'Unable to run command jobs-poll' in warning.msg + + +async def test__prep_submit_task_job_impl_handles_execution_time_limit( + flow: Fixture, + scheduler: Fixture, + start: Fixture, +): + """Ensure that emptying the execution time limit unsets it. + + Previously unsetting the etl by either broadcast or reload + would not unset a previous etl. + + See https://github.com/cylc/cylc-flow/issues/5891 + """ + id_ = flow({ + "scheduling": { + "cycling mode": "integer", + "graph": {"R1": "a"} + }, + "runtime": { + "root": {}, + "a": { + "script": "sleep 10", + "execution time limit": 'PT5S' + } + } + }) + + # Run in live mode - function not called in sim mode. + schd = scheduler(id_, run_mode='live') + async with start(schd): + task_a = schd.pool.get_tasks()[0] + # We're not interested in the job file stuff, just + # in the summary state. + with suppress(FileExistsError): + schd.task_job_mgr._prep_submit_task_job_impl( + schd.workflow, task_a, task_a.tdef.rtconfig) + assert task_a.summary['execution_time_limit'] == 5.0 + + # If we delete the etl it gets deleted in the summary: + task_a.tdef.rtconfig['execution time limit'] = None + with suppress(FileExistsError): + schd.task_job_mgr._prep_submit_task_job_impl( + schd.workflow, task_a, task_a.tdef.rtconfig) + assert not task_a.summary.get('execution_time_limit', '') + + # put everything back and test broadcast too. + task_a.tdef.rtconfig['execution time limit'] = 5.0 + task_a.summary['execution_time_limit'] = 5.0 + schd.broadcast_mgr.broadcasts = { + '1': {'a': {'execution time limit': None}}} + with suppress(FileExistsError): + # We run a higher level function here to ensure + # that the broadcast is applied. + schd.task_job_mgr._prep_submit_task_job( + schd.workflow, task_a) + assert not task_a.summary.get('execution_time_limit', '') diff --git a/tests/integration/test_xtrigger_mgr.py b/tests/integration/test_xtrigger_mgr.py index 07abbdac24d..b4d2d503799 100644 --- a/tests/integration/test_xtrigger_mgr.py +++ b/tests/integration/test_xtrigger_mgr.py @@ -16,6 +16,7 @@ """Tests for the behaviour of xtrigger manager. """ +from pytest_mock import mocker async def test_2_xtriggers(flow, start, scheduler, monkeypatch): """Test that if an itask has 2 wall_clock triggers with different @@ -65,3 +66,56 @@ async def test_2_xtriggers(flow, start, scheduler, monkeypatch): 'clock_2': False, 'clock_3': False, } + + +async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker): + """ + If multiple tasks depend on the same satisfied xtrigger, the DB mgr method + put_xtriggers should only be called once - when the xtrigger gets satisfied. + + See [GitHub #5908](https://github.com/cylc/cylc-flow/pull/5908) + + """ + task_point = 1588636800 # 2020-05-05 + ten_years_ahead = 1904169600 # 2030-05-05 + monkeypatch.setattr( + 'cylc.flow.xtriggers.wall_clock.time', + lambda: ten_years_ahead - 1 + ) + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': True + }, + 'scheduling': { + 'initial cycle point': '2020-05-05', + 'xtriggers': { + 'clock_1': 'wall_clock()', + }, + 'graph': { + 'R1': '@clock_1 => foo & bar' + } + } + }) + + schd = scheduler(id_) + spy = mocker.spy(schd.workflow_db_mgr, 'put_xtriggers') + + async with start(schd): + + # Call the clock trigger via its dependent tasks, to get it satisfied. + for task in schd.pool.get_tasks(): + # (For clock triggers this is synchronous) + schd.xtrigger_mgr.call_xtriggers_async(task) + + # It should now be satisfied. + assert task.state.xtriggers == {'clock_1': True} + + # Check one put_xtriggers call only, not two. + assert spy.call_count == 1 + + # Note on master prior to GH #5908 the call is made from the + # scheduler main loop when the two tasks become satisified, + # resulting in two calls to put_xtriggers. This test fails + # on master, but with call count 0 (not 2) because the main + # loop doesn't run in this test. + diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 68095a5795e..09dcbcd40ad 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -199,6 +199,7 @@ def xtrigger_mgr() -> XtriggerManager: workflow=workflow_name, user=user, proc_pool=Mock(put_command=lambda *a, **k: True), + workflow_db_mgr=Mock(housekeep=lambda *a, **k: True), broadcast_mgr=Mock(put_broadcast=lambda *a, **k: True), data_store_mgr=DataStoreMgr( create_autospec(Scheduler, workflow=workflow_name, owner=user) diff --git a/tests/unit/test_option_parsers.py b/tests/unit/test_option_parsers.py index e3831a1daea..05b52badfbe 100644 --- a/tests/unit/test_option_parsers.py +++ b/tests/unit/test_option_parsers.py @@ -26,7 +26,7 @@ import cylc.flow.flags from cylc.flow.option_parsers import ( CylcOptionParser as COP, Options, combine_options, combine_options_pair, - OptionSettings, cleanup_sysargv + OptionSettings, cleanup_sysargv, filter_sysargv ) @@ -321,20 +321,6 @@ def test_combine_options(inputs, expect): @pytest.mark.parametrize( 'argv_before, kwargs, expect', [ - param( - 'vip myworkflow --foo something'.split(), - { - 'script_name': 'play', - 'workflow_id': 'myworkflow', - 'compound_script_opts': [ - OptionSettings(['--foo', '-f']), - ], - 'script_opts': [ - OptionSettings(['--foo', '-f'])] - }, - 'play myworkflow --foo something'.split(), - id='no opts to remove' - ), param( 'vip myworkflow -f something -b something_else --baz'.split(), { @@ -397,19 +383,6 @@ def test_combine_options(inputs, expect): 'play --foo something myworkflow'.split(), id='no path given' ), - param( - 'vip --bar=something'.split(), - { - 'script_name': 'play', - 'workflow_id': 'myworkflow', - 'compound_script_opts': [ - OptionSettings(['--bar', '-b'])], - 'script_opts': [], - 'source': './myworkflow', - }, - 'play myworkflow'.split(), - id='removes --key=value' - ), ] ) def test_cleanup_sysargv(monkeypatch, argv_before, kwargs, expect): @@ -432,6 +405,53 @@ def test_cleanup_sysargv(monkeypatch, argv_before, kwargs, expect): assert sys.argv == dummy_cylc_path + expect +@pytest.mark.parametrize( + 'sysargs, simple, compound, expect', ( + param( + # Test for https://github.com/cylc/cylc-flow/issues/5905 + '--no-run-name --workflow-name=name'.split(), + ['--no-run-name'], + ['--workflow-name'], + [], + id='--workflow-name=name' + ), + param( + '--foo something'.split(), + [], [], '--foo something'.split(), + id='no-opts-removed' + ), + param( + [], ['--foo'], ['--bar'], [], + id='Null-check' + ), + param( + '''--keep1 --keep2 42 --keep3=Hi + --throw1 --throw2 84 --throw3=There + '''.split(), + ['--throw1'], + '--throw2 --throw3'.split(), + '--keep1 --keep2 42 --keep3=Hi'.split(), + id='complex' + ), + param( + "--foo 'foo=42' --bar='foo=94'".split(), + [], ['--foo'], + ['--bar=\'foo=94\''], + id='--bar=\'foo=94\'' + ) + ) +) +def test_filter_sysargv( + sysargs, simple, compound, expect +): + """It returns the subset of sys.argv that we ask for. + + n.b. The three most basic cases for this function are stored in + its own docstring. + """ + assert filter_sysargv(sysargs, simple, compound) == expect + + class TestOptionSettings(): @staticmethod def test_init(): diff --git a/tests/unit/test_xtrigger_mgr.py b/tests/unit/test_xtrigger_mgr.py index 89cc0024ddf..bba77c1f7f1 100644 --- a/tests/unit/test_xtrigger_mgr.py +++ b/tests/unit/test_xtrigger_mgr.py @@ -281,58 +281,3 @@ def test_callback(xtrigger_mgr): xtrigger_mgr.callback(get_name) # this means that the xtrigger was satisfied assert xtrigger_mgr.sat_xtrig - - -def test_check_xtriggers(xtrigger_mgr): - """Test process_xtriggers call.""" - - xtrigger_mgr.validate_xtrigger = lambda *a, **k: True # Ignore validation - # add an xtrigger - get_name = SubFuncContext( - label="get_name", - func_name="get_name", - func_args=[], - func_kwargs={} - ) - xtrigger_mgr.add_trig("get_name", get_name, 'fdir') - get_name.out = "[\"True\", {\"name\": \"Yossarian\"}]" - tdef1 = TaskDef( - name="foo", - rtcfg=None, - run_mode="live", - start_point=1, - initial_point=1 - ) - init() - sequence = ISO8601Sequence('P1D', '2019') - tdef1.xtrig_labels[sequence] = ["get_name"] - start_point = ISO8601Point('2019') - itask1 = TaskProxy(Tokens('~user/workflow'), tdef1, start_point) - itask1.state.xtriggers["get_name"] = False # satisfied? - - # add a clock xtrigger - wall_clock = SubFuncContext( - label="wall_clock", - func_name="wall_clock", - func_args=[], - func_kwargs={} - ) - wall_clock.out = "[\"True\", \"1\"]" - xtrigger_mgr.add_trig("wall_clock", wall_clock, "fdir") - # create a task - tdef2 = TaskDef( - name="foo", - rtcfg=None, - run_mode="live", - start_point=1, - initial_point=1 - ) - tdef2.xtrig_labels[sequence] = ["wall_clock"] - init() - start_point = ISO8601Point('20000101T0000+05') - # create task proxy - TaskProxy(Tokens('~user/workflow'), tdef2, start_point) - - xtrigger_mgr.check_xtriggers(itask1, lambda foo: None) - # won't be satisfied, as it is async, we are are not calling callback - assert not xtrigger_mgr.sat_xtrig