From 8606f93ea8c8260fee19fa2dac14da9245999c20 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 26 Oct 2023 10:38:04 +0100 Subject: [PATCH] Test the simulation mode code. (#5712) * Test the simulation mode code. - Refactor simulation mode functions. - Improve documentation of sim mode settings. - Ensure 100% test coverage of sim mode. --- cylc/flow/cfgspec/workflow.py | 7 + cylc/flow/config.py | 67 +----- cylc/flow/scheduler.py | 7 +- cylc/flow/simulation.py | 219 ++++++++++++++++++ cylc/flow/task_events_mgr.py | 33 ++- cylc/flow/task_job_mgr.py | 8 +- cylc/flow/task_pool.py | 41 ---- tests/functional/modes/03-simulation.t | 32 +++ .../functional/modes/03-simulation/flow.cylc | 16 ++ .../modes/03-simulation/reference.log | 2 + tests/integration/test_simulation.py | 125 ++++++++++ tests/unit/test_config.py | 32 ++- tests/unit/test_simulation.py | 166 +++++++++++++ 13 files changed, 628 insertions(+), 127 deletions(-) create mode 100644 cylc/flow/simulation.py create mode 100644 tests/functional/modes/03-simulation.t create mode 100644 tests/functional/modes/03-simulation/flow.cylc create mode 100644 tests/functional/modes/03-simulation/reference.log create mode 100644 tests/integration/test_simulation.py create mode 100644 tests/unit/test_simulation.py diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index a6d16bbe76d..f521614c73a 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -1260,10 +1260,17 @@ def get_script_common_text(this: str, example: Optional[str] = None): - ``all`` - all instance of the task will fail - ``2017-08-12T06, 2017-08-12T18`` - these instances of the task will fail + + If you set :cylc:conf:`[..][..]execution retry delays` + the second attempt will succeed unless you set + :cylc:conf:`[..]fail try 1 only = False`. ''') Conf('fail try 1 only', VDR.V_BOOLEAN, True, desc=''' If ``True`` only the first run of the task instance will fail, otherwise retries will fail too. + + Task instances must be set to fail by + :cylc:conf:`[..]fail cycle points`. ''') Conf('disable task event handlers', VDR.V_BOOLEAN, True, desc=''' diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 0096d924c8b..d80456266bf 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -79,8 +79,8 @@ get_cylc_run_dir, is_relative_to, ) -from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM from cylc.flow.print_tree import print_tree +from cylc.flow.simulation import configure_sim_modes from cylc.flow.subprocctx import SubFuncContext from cylc.flow.task_events_mgr import ( EventData, @@ -521,7 +521,8 @@ def __init__( self.process_runahead_limit() if self.run_mode('simulation', 'dummy'): - self.configure_sim_modes() + configure_sim_modes( + self.taskdefs.values(), self.run_mode()) self.configure_workflow_state_polling_tasks() @@ -1340,68 +1341,6 @@ def configure_workflow_state_polling_tasks(self): script = "echo " + comstr + "\n" + comstr rtc['script'] = script - def configure_sim_modes(self): - """Adjust task defs for simulation and dummy mode.""" - for tdef in self.taskdefs.values(): - # Compute simulated run time by scaling the execution limit. - rtc = tdef.rtconfig - limit = rtc['execution time limit'] - speedup = rtc['simulation']['speedup factor'] - if limit and speedup: - sleep_sec = (DurationParser().parse( - str(limit)).get_seconds() / speedup) - else: - sleep_sec = DurationParser().parse( - str(rtc['simulation']['default run length']) - ).get_seconds() - rtc['execution time limit'] = ( - sleep_sec + DurationParser().parse(str( - rtc['simulation']['time limit buffer'])).get_seconds() - ) - rtc['job']['simulated run length'] = sleep_sec - - # Generate dummy scripting. - rtc['init-script'] = "" - rtc['env-script'] = "" - rtc['pre-script'] = "" - rtc['post-script'] = "" - scr = "sleep %d" % sleep_sec - # Dummy message outputs. - for msg in rtc['outputs'].values(): - scr += "\ncylc message '%s'" % msg - if rtc['simulation']['fail try 1 only']: - arg1 = "true" - else: - arg1 = "false" - arg2 = " ".join(rtc['simulation']['fail cycle points']) - scr += "\ncylc__job__dummy_result %s %s || exit 1" % (arg1, arg2) - rtc['script'] = scr - - # Dummy mode jobs should run on platform localhost - # All Cylc 7 config items which conflict with platform are removed. - for section, keys in FORBIDDEN_WITH_PLATFORM.items(): - if section in rtc: - for key in keys: - if key in rtc[section]: - rtc[section][key] = None - - rtc['platform'] = 'localhost' - - # Disable environment, in case it depends on env-script. - rtc['environment'] = {} - - # Simulation mode tasks should fail in which cycle points? - f_pts = [] - f_pts_orig = rtc['simulation']['fail cycle points'] - if 'all' in f_pts_orig: - # None for "fail all points". - f_pts = None - else: - # (And [] for "fail no points".) - for point_str in f_pts_orig: - f_pts.append(get_point(point_str).standardise()) - rtc['simulation']['fail cycle points'] = f_pts - def get_parent_lists(self): return self.runtime['parents'] diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 1d9fc5c21dd..effe02e910d 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -113,6 +113,7 @@ ) from cylc.flow.profiler import Profiler from cylc.flow.resources import get_resources +from cylc.flow.simulation import sim_time_check from cylc.flow.subprocpool import SubProcPool from cylc.flow.templatevars import eval_var from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager @@ -1740,7 +1741,11 @@ async def main_loop(self) -> None: self.pool.set_expired_tasks() self.release_queued_tasks() - if self.pool.sim_time_check(self.message_queue): + if ( + self.pool.config.run_mode('simulation') + and sim_time_check( + self.message_queue, self.pool.get_tasks()) + ): # A simulated task state change occurred. self.reset_inactivity_timer() diff --git a/cylc/flow/simulation.py b/cylc/flow/simulation.py new file mode 100644 index 00000000000..15314f8e3e7 --- /dev/null +++ b/cylc/flow/simulation.py @@ -0,0 +1,219 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +"""Utilities supporting simulation and skip modes +""" + +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from time import time + +from cylc.flow.cycling.loader import get_point +from cylc.flow.network.resolvers import TaskMsg +from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM +from cylc.flow.task_state import ( + TASK_STATUS_RUNNING, + TASK_STATUS_FAILED, + TASK_STATUS_SUCCEEDED, +) +from cylc.flow.wallclock import get_current_time_string + +from metomi.isodatetime.parsers import DurationParser + +if TYPE_CHECKING: + from queue import Queue + from cylc.flow.cycling import PointBase + from cylc.flow.task_proxy import TaskProxy + + +def configure_sim_modes(taskdefs, sim_mode): + """Adjust task defs for simulation and dummy mode. + + """ + dummy_mode = bool(sim_mode == 'dummy') + + for tdef in taskdefs: + # Compute simulated run time by scaling the execution limit. + rtc = tdef.rtconfig + sleep_sec = get_simulated_run_len(rtc) + + rtc['execution time limit'] = ( + sleep_sec + DurationParser().parse(str( + rtc['simulation']['time limit buffer'])).get_seconds() + ) + + rtc['simulation']['simulated run length'] = sleep_sec + rtc['submission retry delays'] = [1] + + # Generate dummy scripting. + rtc['init-script'] = "" + rtc['env-script'] = "" + rtc['pre-script'] = "" + rtc['post-script'] = "" + rtc['script'] = build_dummy_script( + rtc, sleep_sec) if dummy_mode else "" + + disable_platforms(rtc) + + # Disable environment, in case it depends on env-script. + rtc['environment'] = {} + + rtc["simulation"][ + "fail cycle points" + ] = parse_fail_cycle_points( + rtc["simulation"]["fail cycle points"] + ) + + +def get_simulated_run_len(rtc: Dict[str, Any]) -> int: + """Get simulated run time. + + rtc = run time config + """ + limit = rtc['execution time limit'] + speedup = rtc['simulation']['speedup factor'] + if limit and speedup: + sleep_sec = (DurationParser().parse( + str(limit)).get_seconds() / speedup) + else: + sleep_sec = DurationParser().parse( + str(rtc['simulation']['default run length']) + ).get_seconds() + + return sleep_sec + + +def build_dummy_script(rtc: Dict[str, Any], sleep_sec: int) -> str: + """Create fake scripting for dummy mode. + + This is for Dummy mode only. + """ + script = "sleep %d" % sleep_sec + # Dummy message outputs. + for msg in rtc['outputs'].values(): + script += "\ncylc message '%s'" % msg + if rtc['simulation']['fail try 1 only']: + arg1 = "true" + else: + arg1 = "false" + arg2 = " ".join(rtc['simulation']['fail cycle points']) + script += "\ncylc__job__dummy_result %s %s || exit 1" % (arg1, arg2) + return script + + +def disable_platforms( + rtc: Dict[str, Any] +) -> None: + """Force platform = localhost + + Remove legacy sections [job] and [remote], which would conflict + with setting platforms. + + This can be simplified when support for the FORBIDDEN_WITH_PLATFORM + configurations is dropped. + """ + for section, keys in FORBIDDEN_WITH_PLATFORM.items(): + if section in rtc: + for key in keys: + if key in rtc[section]: + rtc[section][key] = None + rtc['platform'] = 'localhost' + + +def parse_fail_cycle_points( + f_pts_orig: List[str] +) -> 'Union[None, List[PointBase]]': + """Parse `[simulation][fail cycle points]`. + + - None for "fail all points". + - Else a list of cycle point objects. + + Examples: + >>> this = parse_fail_cycle_points + >>> this(['all']) is None + True + >>> this([]) + [] + """ + f_pts: 'Optional[List[PointBase]]' + if 'all' in f_pts_orig: + f_pts = None + else: + f_pts = [] + for point_str in f_pts_orig: + f_pts.append(get_point(point_str).standardise()) + return f_pts + + +def sim_time_check( + message_queue: 'Queue[TaskMsg]', itasks: 'List[TaskProxy]' +) -> bool: + """Check if sim tasks have been "running" for as long as required. + + If they have change the task state. + + Returns: + True if _any_ simulated task state has changed. + """ + sim_task_state_changed = False + now = time() + for itask in itasks: + if itask.state.status != TASK_STATUS_RUNNING: + continue + # Started time is not set on restart + if itask.summary['started_time'] is None: + itask.summary['started_time'] = now + timeout = ( + itask.summary['started_time'] + + itask.tdef.rtconfig['simulation']['simulated run length'] + ) + if now > timeout: + job_d = itask.tokens.duplicate(job=str(itask.submit_num)) + now_str = get_current_time_string() + if sim_task_failed( + itask.tdef.rtconfig['simulation'], + itask.point, + itask.get_try_num() + ): + message_queue.put( + TaskMsg(job_d, now_str, 'CRITICAL', TASK_STATUS_FAILED) + ) + else: + # Simulate message outputs. + for msg in itask.tdef.rtconfig['outputs'].values(): + message_queue.put( + TaskMsg(job_d, now_str, 'DEBUG', msg) + ) + message_queue.put( + TaskMsg(job_d, now_str, 'DEBUG', TASK_STATUS_SUCCEEDED) + ) + sim_task_state_changed = True + return sim_task_state_changed + + +def sim_task_failed( + sim_conf: Dict[str, Any], + point: 'PointBase', + try_num: int, +) -> bool: + """Encapsulate logic for deciding whether a sim task has failed. + + Allows Unit testing. + """ + return ( + sim_conf['fail cycle points'] is None # i.e. "all" + or point in sim_conf['fail cycle points'] + ) and ( + try_num == 1 or not sim_conf['fail try 1 only'] + ) diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index d4d9130ee27..ec5e467b8aa 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -575,6 +575,7 @@ def process_message( True: if polling is required to confirm a reversal of status. """ + # Log messages if event_time is None: event_time = get_current_time_string() @@ -617,7 +618,6 @@ def process_message( self.setup_event_handlers( itask, self.EVENT_STARTED, f'job {self.EVENT_STARTED}') self.spawn_func(itask, TASK_OUTPUT_STARTED) - if message == self.EVENT_STARTED: if ( flag == self.FLAG_RECEIVED @@ -777,24 +777,23 @@ def _process_message_check( return False if ( - itask.state(TASK_STATUS_WAITING) - and + itask.state(TASK_STATUS_WAITING) + and itask.tdef.run_mode == 'live' # Polling in live mode only. + and ( ( - ( - # task has a submit-retry lined up - TimerFlags.SUBMISSION_RETRY in itask.try_timers - and itask.try_timers[ - TimerFlags.SUBMISSION_RETRY].num > 0 - ) - or - ( - # task has an execution-retry lined up - TimerFlags.EXECUTION_RETRY in itask.try_timers - and itask.try_timers[ - TimerFlags.EXECUTION_RETRY].num > 0 - ) + # task has a submit-retry lined up + TimerFlags.SUBMISSION_RETRY in itask.try_timers + and itask.try_timers[ + TimerFlags.SUBMISSION_RETRY].num > 0 ) - + or + ( + # task has an execution-retry lined up + TimerFlags.EXECUTION_RETRY in itask.try_timers + and itask.try_timers[ + TimerFlags.EXECUTION_RETRY].num > 0 + ) + ) ): # Ignore messages if task has a retry lined up # (caused by polling overlapping with task failure) diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 412ba23b97a..d6bc59fb255 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -260,12 +260,13 @@ def submit_task_jobs(self, workflow, itasks, curve_auth, Return (list): list of tasks that attempted submission. """ - if is_simulation: return self._simulation_submit_task_jobs(itasks, workflow) + # Prepare tasks for job submission prepared_tasks, bad_tasks = self.prep_submit_task_jobs( workflow, itasks) + # Reset consumed host selection results self.task_remote_mgr.subshell_eval_reset() @@ -999,16 +1000,17 @@ def _simulation_submit_task_jobs(self, itasks, workflow): itask.waiting_on_job_prep = False itask.submit_num += 1 self._set_retry_timers(itask) + itask.platform = {'name': 'SIMULATION'} itask.summary['job_runner_name'] = 'SIMULATION' itask.summary[self.KEY_EXECUTE_TIME_LIMIT] = ( - itask.tdef.rtconfig['job']['simulated run length'] + itask.tdef.rtconfig['simulation']['simulated run length'] ) itask.jobs.append( self.get_simulation_job_conf(itask, workflow) ) self.task_events_mgr.process_message( - itask, INFO, TASK_OUTPUT_SUBMITTED + itask, INFO, TASK_OUTPUT_SUBMITTED, ) return itasks diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 2e8d8a634ac..f9cc94deaaf 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -40,7 +40,6 @@ from cylc.flow.id import Tokens, detokenise from cylc.flow.id_cli import contains_fnmatch from cylc.flow.id_match import filter_ids -from cylc.flow.network.resolvers import TaskMsg from cylc.flow.workflow_status import StopMode from cylc.flow.task_action_timer import TaskActionTimer, TimerFlags from cylc.flow.task_events_mgr import ( @@ -74,7 +73,6 @@ from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NONE, FLOW_NEW if TYPE_CHECKING: - from queue import Queue from cylc.flow.config import WorkflowConfig from cylc.flow.cycling import IntervalBase, PointBase from cylc.flow.data_store_mgr import DataStoreMgr @@ -1756,45 +1754,6 @@ def force_trigger_tasks( return len(unmatched) - def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool: - """Simulation mode: simulate task run times and set states.""" - if not self.config.run_mode('simulation'): - return False - sim_task_state_changed = False - now = time() - for itask in self.get_tasks(): - if itask.state.status != TASK_STATUS_RUNNING: - continue - # Started time is not set on restart - if itask.summary['started_time'] is None: - itask.summary['started_time'] = now - timeout = (itask.summary['started_time'] + - itask.tdef.rtconfig['job']['simulated run length']) - if now > timeout: - conf = itask.tdef.rtconfig['simulation'] - job_d = itask.tokens.duplicate(job=str(itask.submit_num)) - now_str = get_current_time_string() - if ( - conf['fail cycle points'] is None # i.e. "all" - or itask.point in conf['fail cycle points'] - ) and ( - itask.get_try_num() == 1 or not conf['fail try 1 only'] - ): - message_queue.put( - TaskMsg(job_d, now_str, 'CRITICAL', TASK_STATUS_FAILED) - ) - else: - # Simulate message outputs. - for msg in itask.tdef.rtconfig['outputs'].values(): - message_queue.put( - TaskMsg(job_d, now_str, 'DEBUG', msg) - ) - message_queue.put( - TaskMsg(job_d, now_str, 'DEBUG', TASK_STATUS_SUCCEEDED) - ) - sim_task_state_changed = True - return sim_task_state_changed - def set_expired_tasks(self): res = False for itask in self.get_tasks(): diff --git a/tests/functional/modes/03-simulation.t b/tests/functional/modes/03-simulation.t new file mode 100644 index 00000000000..87a7ca37a9b --- /dev/null +++ b/tests/functional/modes/03-simulation.t @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# Test that simulation mode runs, and reruns a failed task successfully +# when execution retry delays is configured. + +. "$(dirname "$0")/test_header" +set_test_number 2 + +install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" +run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" +workflow_run_ok "${TEST_NAME_BASE}-run" \ + cylc play \ + --no-detach \ + --mode=simulation \ + --reference-test "${WORKFLOW_NAME}" +purge +exit diff --git a/tests/functional/modes/03-simulation/flow.cylc b/tests/functional/modes/03-simulation/flow.cylc new file mode 100644 index 00000000000..49300212d39 --- /dev/null +++ b/tests/functional/modes/03-simulation/flow.cylc @@ -0,0 +1,16 @@ +[scheduler] + [[events]] + workflow timeout = PT30S + +[scheduling] + initial cycle point = 2359 + [[graph]] + R1 = get_observations + +[runtime] + [[get_observations]] + execution retry delays = PT2S + [[[simulation]]] + fail cycle points = all + fail try 1 only = True + diff --git a/tests/functional/modes/03-simulation/reference.log b/tests/functional/modes/03-simulation/reference.log new file mode 100644 index 00000000000..2d14bc201fb --- /dev/null +++ b/tests/functional/modes/03-simulation/reference.log @@ -0,0 +1,2 @@ +23590101T0000Z/get_observations -triggered off [] in flow 1 +23590101T0000Z/get_observations -triggered off [] in flow 1 diff --git a/tests/integration/test_simulation.py b/tests/integration/test_simulation.py new file mode 100644 index 00000000000..e0ada2c3e49 --- /dev/null +++ b/tests/integration/test_simulation.py @@ -0,0 +1,125 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import pytest +from queue import Queue +from types import SimpleNamespace + +from cylc.flow.cycling.iso8601 import ISO8601Point +from cylc.flow.simulation import sim_time_check + + +def get_msg_queue_item(queue, id_): + for item in queue.queue: + if id_ in str(item.job_id): + return item + + +@pytest.fixture(scope='module') +async def sim_time_check_setup( + mod_flow, mod_scheduler, mod_start, mod_one_conf +): + schd = mod_scheduler(mod_flow({ + 'scheduler': {'cycle point format': '%Y'}, + 'scheduling': { + 'initial cycle point': '1066', + 'graph': { + 'R1': 'one & fail_all & fast_forward' + } + }, + 'runtime': { + 'one': {}, + 'fail_all': { + 'simulation': {'fail cycle points': 'all'}, + 'outputs': {'foo': 'bar'} + }, + # This task ought not be finished quickly, but for the speed up + 'fast_forward': { + 'execution time limit': 'PT1M', + 'simulation': {'speedup factor': 2} + } + } + })) + msg_q = Queue() + async with mod_start(schd): + itasks = schd.pool.get_tasks() + for i in itasks: + i.try_timers = {'execution-retry': SimpleNamespace(num=0)} + yield schd, itasks, msg_q + + +def test_false_if_not_running(sim_time_check_setup, monkeypatch): + schd, itasks, msg_q = sim_time_check_setup + + # False if task status not running: + assert sim_time_check(msg_q, itasks) is False + + +def test_sim_time_check_sets_started_time(sim_time_check_setup): + """But sim_time_check still returns False""" + schd, _, msg_q = sim_time_check_setup + one_1066 = schd.pool.get_task(ISO8601Point('1066'), 'one') + one_1066.state.status = 'running' + assert one_1066.summary['started_time'] is None + assert sim_time_check(msg_q, [one_1066]) is False + assert one_1066.summary['started_time'] is not None + + +def test_task_finishes(sim_time_check_setup, monkeypatch): + """...and an appropriate message sent. + + Checks all possible outcomes in sim_time_check where elapsed time is + greater than the simulation time. + + Does NOT check every possible cause on an outcome - this is done + in unit tests. + """ + schd, _, msg_q = sim_time_check_setup + monkeypatch.setattr('cylc.flow.simulation.time', lambda: 0) + + # Setup a task to fail + fail_all_1066 = schd.pool.get_task(ISO8601Point('1066'), 'fail_all') + fail_all_1066.state.status = 'running' + fail_all_1066.try_timers = {'execution-retry': SimpleNamespace(num=0)} + + # Before simulation time is up: + assert sim_time_check(msg_q, [fail_all_1066]) is False + + # After simulation time is up: + monkeypatch.setattr('cylc.flow.simulation.time', lambda: 12) + assert sim_time_check(msg_q, [fail_all_1066]) is True + assert get_msg_queue_item(msg_q, '1066/fail_all').message == 'failed' + + # Succeeds and records messages for all outputs: + fail_all_1066.try_timers = {'execution-retry': SimpleNamespace(num=1)} + msg_q = Queue() + assert sim_time_check(msg_q, [fail_all_1066]) is True + assert sorted(i.message for i in msg_q.queue) == ['bar', 'succeeded'] + + +def test_task_sped_up(sim_time_check_setup, monkeypatch): + """Task will speed up by a factor set in config.""" + schd, _, msg_q = sim_time_check_setup + fast_forward_1066 = schd.pool.get_task( + ISO8601Point('1066'), 'fast_forward') + fast_forward_1066.state.status = 'running' + + monkeypatch.setattr('cylc.flow.simulation.time', lambda: 0) + assert sim_time_check(msg_q, [fast_forward_1066]) is False + monkeypatch.setattr('cylc.flow.simulation.time', lambda: 29) + assert sim_time_check(msg_q, [fast_forward_1066]) is False + monkeypatch.setattr('cylc.flow.simulation.time', lambda: 31) + assert sim_time_check(msg_q, [fast_forward_1066]) is True diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 34c596a770c..bb55cbf295e 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from copy import deepcopy import os import sys from optparse import Values @@ -38,12 +39,13 @@ from cylc.flow.parsec.exceptions import Jinja2Error, EmPyError from cylc.flow.scheduler_cli import RunOptions from cylc.flow.scripts.validate import ValidateOptions +from cylc.flow.simulation import configure_sim_modes from cylc.flow.workflow_files import WorkflowFiles from cylc.flow.wallclock import get_utc_mode, set_utc_mode from cylc.flow.xtrigger_mgr import XtriggerManager from cylc.flow.task_outputs import ( TASK_OUTPUT_SUBMITTED, - TASK_OUTPUT_SUCCEEDED + TASK_OUTPUT_SUCCEEDED, ) from cylc.flow.cycling.iso8601 import ISO8601Point @@ -1741,3 +1743,31 @@ def test_cylc_env_at_parsing( assert var in cylc_env else: assert var not in cylc_env + + +def test_configure_sim_mode(caplog): + job_section = {} + sim_section = { + 'speedup factor': '', + 'default run length': 'PT10S', + 'time limit buffer': 'PT0S', + 'fail try 1 only': False, + 'fail cycle points': '', + } + rtconfig_1 = { + 'execution time limit': '', + 'simulation': sim_section, + 'job': job_section, + 'outputs': {}, + } + rtconfig_2 = deepcopy(rtconfig_1) + rtconfig_2['simulation']['default run length'] = 'PT2S' + + taskdefs = [ + SimpleNamespace(rtconfig=rtconfig_1), + SimpleNamespace(rtconfig=rtconfig_2), + ] + configure_sim_modes(taskdefs, 'simulation') + results = [ + i.rtconfig['simulation']['simulated run length'] for i in taskdefs] + assert results == [10.0, 2.0] diff --git a/tests/unit/test_simulation.py b/tests/unit/test_simulation.py new file mode 100644 index 00000000000..1c490f35c16 --- /dev/null +++ b/tests/unit/test_simulation.py @@ -0,0 +1,166 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +"""Tests for utilities supporting simulation and skip modes +""" +import pytest +from pytest import param + +from cylc.flow.cycling.integer import IntegerPoint +from cylc.flow.cycling.iso8601 import ISO8601Point +from cylc.flow.simulation import ( + parse_fail_cycle_points, + build_dummy_script, + disable_platforms, + get_simulated_run_len, + sim_task_failed, +) + + +@pytest.mark.parametrize( + 'execution_time_limit, speedup_factor, default_run_length', + ( + param(None, None, 'PT1H', id='default-run-length'), + param(None, 10, 'PT1H', id='speedup-factor-alone'), + param('PT1H', None, 'PT1H', id='execution-time-limit-alone'), + param('P1D', 24, 'PT1M', id='speed-up-and-execution-tl'), + ) +) +def test_get_simulated_run_len( + execution_time_limit, speedup_factor, default_run_length +): + """Test the logic of the presence or absence of config items. + + Avoid testing the correct workign of DurationParser. + """ + rtc = { + 'execution time limit': execution_time_limit, + 'simulation': { + 'speedup factor': speedup_factor, + 'default run length': default_run_length + } + } + assert get_simulated_run_len(rtc) == 3600 + + +@pytest.mark.parametrize( + 'fail_one_time_only', (True, False) +) +def test_set_simulation_script(fail_one_time_only): + rtc = { + 'outputs': {'foo': '1', 'bar': '2'}, + 'simulation': { + 'fail try 1 only': fail_one_time_only, + 'fail cycle points': '1', + } + } + result = build_dummy_script(rtc, 60) + assert result.split('\n') == [ + 'sleep 60', + "cylc message '1'", + "cylc message '2'", + f"cylc__job__dummy_result {str(fail_one_time_only).lower()}" + " 1 || exit 1" + ] + + +@pytest.mark.parametrize( + 'rtc, expect', ( + ({'platform': 'skarloey'}, 'localhost'), + ({'remote': {'host': 'rheneas'}}, 'localhost'), + ({'job': {'batch system': 'loaf'}}, 'localhost'), + ) +) +def test_disable_platforms(rtc, expect): + """A sampling of items FORBIDDEN_WITH_PLATFORMS are removed from a + config passed to this method. + """ + disable_platforms(rtc) + assert rtc['platform'] == expect + subdicts = [v for v in rtc.values() if isinstance(v, dict)] + for subdict in subdicts: + for k, val in subdict.items(): + if k != 'platform': + assert val is None + + +def test_parse_fail_cycle_points(set_cycling_type): + before = ['2', '4'] + set_cycling_type() + assert parse_fail_cycle_points(before) == [ + IntegerPoint(i) for i in before + ] + + +@pytest.mark.parametrize( + 'conf, point, try_, expect', + ( + param( + {'fail cycle points': [], 'fail try 1 only': True}, + ISO8601Point('1'), + 1, + False, + id='defaults' + ), + param( + {'fail cycle points': None, 'fail try 1 only': True}, + ISO8601Point('1066'), + 1, + True, + id='fail-all' + ), + param( + { + 'fail cycle points': [ + ISO8601Point('1066'), ISO8601Point('1067')], + 'fail try 1 only': True + }, + ISO8601Point('1067'), + 1, + True, + id='point-in-failCP' + ), + param( + { + 'fail cycle points': [ + ISO8601Point('1066'), ISO8601Point('1067')], + 'fail try 1 only': True + }, + ISO8601Point('1000'), + 1, + False, + id='point-notin-failCP' + ), + param( + {'fail cycle points': None, 'fail try 1 only': True}, + ISO8601Point('1066'), + 2, + False, + id='succeed-attempt2' + ), + param( + {'fail cycle points': None, 'fail try 1 only': False}, + ISO8601Point('1066'), + 7, + True, + id='fail-attempt7' + ), + ) +) +def test_sim_task_failed( + conf, point, try_, expect, set_cycling_type +): + set_cycling_type('iso8601') + assert sim_task_failed(conf, point, try_) == expect