Skip to content

Commit

Permalink
Test the simulation mode code. (#5712)
Browse files Browse the repository at this point in the history
* Test the simulation mode code.
- Refactor simulation mode functions.
- Improve documentation of sim mode settings.
- Ensure 100% test coverage of sim mode.
  • Loading branch information
wxtim authored Oct 26, 2023
1 parent 9f4d0fb commit 8606f93
Show file tree
Hide file tree
Showing 13 changed files with 628 additions and 127 deletions.
7 changes: 7 additions & 0 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,10 +1260,17 @@ def get_script_common_text(this: str, example: Optional[str] = None):
- ``all`` - all instance of the task will fail
- ``2017-08-12T06, 2017-08-12T18`` - these instances of
the task will fail
If you set :cylc:conf:`[..][..]execution retry delays`
the second attempt will succeed unless you set
:cylc:conf:`[..]fail try 1 only = False`.
''')
Conf('fail try 1 only', VDR.V_BOOLEAN, True, desc='''
If ``True`` only the first run of the task
instance will fail, otherwise retries will fail too.
Task instances must be set to fail by
:cylc:conf:`[..]fail cycle points`.
''')
Conf('disable task event handlers', VDR.V_BOOLEAN, True,
desc='''
Expand Down
67 changes: 3 additions & 64 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@
get_cylc_run_dir,
is_relative_to,
)
from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
from cylc.flow.print_tree import print_tree
from cylc.flow.simulation import configure_sim_modes
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.task_events_mgr import (
EventData,
Expand Down Expand Up @@ -521,7 +521,8 @@ def __init__(
self.process_runahead_limit()

if self.run_mode('simulation', 'dummy'):
self.configure_sim_modes()
configure_sim_modes(
self.taskdefs.values(), self.run_mode())

self.configure_workflow_state_polling_tasks()

Expand Down Expand Up @@ -1340,68 +1341,6 @@ def configure_workflow_state_polling_tasks(self):
script = "echo " + comstr + "\n" + comstr
rtc['script'] = script

def configure_sim_modes(self):
"""Adjust task defs for simulation and dummy mode."""
for tdef in self.taskdefs.values():
# Compute simulated run time by scaling the execution limit.
rtc = tdef.rtconfig
limit = rtc['execution time limit']
speedup = rtc['simulation']['speedup factor']
if limit and speedup:
sleep_sec = (DurationParser().parse(
str(limit)).get_seconds() / speedup)
else:
sleep_sec = DurationParser().parse(
str(rtc['simulation']['default run length'])
).get_seconds()
rtc['execution time limit'] = (
sleep_sec + DurationParser().parse(str(
rtc['simulation']['time limit buffer'])).get_seconds()
)
rtc['job']['simulated run length'] = sleep_sec

# Generate dummy scripting.
rtc['init-script'] = ""
rtc['env-script'] = ""
rtc['pre-script'] = ""
rtc['post-script'] = ""
scr = "sleep %d" % sleep_sec
# Dummy message outputs.
for msg in rtc['outputs'].values():
scr += "\ncylc message '%s'" % msg
if rtc['simulation']['fail try 1 only']:
arg1 = "true"
else:
arg1 = "false"
arg2 = " ".join(rtc['simulation']['fail cycle points'])
scr += "\ncylc__job__dummy_result %s %s || exit 1" % (arg1, arg2)
rtc['script'] = scr

# Dummy mode jobs should run on platform localhost
# All Cylc 7 config items which conflict with platform are removed.
for section, keys in FORBIDDEN_WITH_PLATFORM.items():
if section in rtc:
for key in keys:
if key in rtc[section]:
rtc[section][key] = None

rtc['platform'] = 'localhost'

# Disable environment, in case it depends on env-script.
rtc['environment'] = {}

# Simulation mode tasks should fail in which cycle points?
f_pts = []
f_pts_orig = rtc['simulation']['fail cycle points']
if 'all' in f_pts_orig:
# None for "fail all points".
f_pts = None
else:
# (And [] for "fail no points".)
for point_str in f_pts_orig:
f_pts.append(get_point(point_str).standardise())
rtc['simulation']['fail cycle points'] = f_pts

def get_parent_lists(self):
return self.runtime['parents']

Expand Down
7 changes: 6 additions & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
)
from cylc.flow.profiler import Profiler
from cylc.flow.resources import get_resources
from cylc.flow.simulation import sim_time_check
from cylc.flow.subprocpool import SubProcPool
from cylc.flow.templatevars import eval_var
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
Expand Down Expand Up @@ -1740,7 +1741,11 @@ async def main_loop(self) -> None:
self.pool.set_expired_tasks()
self.release_queued_tasks()

if self.pool.sim_time_check(self.message_queue):
if (
self.pool.config.run_mode('simulation')
and sim_time_check(
self.message_queue, self.pool.get_tasks())
):
# A simulated task state change occurred.
self.reset_inactivity_timer()

Expand Down
219 changes: 219 additions & 0 deletions cylc/flow/simulation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Utilities supporting simulation and skip modes
"""

from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from time import time

from cylc.flow.cycling.loader import get_point
from cylc.flow.network.resolvers import TaskMsg
from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
from cylc.flow.task_state import (
TASK_STATUS_RUNNING,
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
)
from cylc.flow.wallclock import get_current_time_string

from metomi.isodatetime.parsers import DurationParser

if TYPE_CHECKING:
from queue import Queue
from cylc.flow.cycling import PointBase
from cylc.flow.task_proxy import TaskProxy


def configure_sim_modes(taskdefs, sim_mode):
"""Adjust task defs for simulation and dummy mode.
"""
dummy_mode = bool(sim_mode == 'dummy')

for tdef in taskdefs:
# Compute simulated run time by scaling the execution limit.
rtc = tdef.rtconfig
sleep_sec = get_simulated_run_len(rtc)

rtc['execution time limit'] = (
sleep_sec + DurationParser().parse(str(
rtc['simulation']['time limit buffer'])).get_seconds()
)

rtc['simulation']['simulated run length'] = sleep_sec
rtc['submission retry delays'] = [1]

# Generate dummy scripting.
rtc['init-script'] = ""
rtc['env-script'] = ""
rtc['pre-script'] = ""
rtc['post-script'] = ""
rtc['script'] = build_dummy_script(
rtc, sleep_sec) if dummy_mode else ""

disable_platforms(rtc)

# Disable environment, in case it depends on env-script.
rtc['environment'] = {}

rtc["simulation"][
"fail cycle points"
] = parse_fail_cycle_points(
rtc["simulation"]["fail cycle points"]
)


def get_simulated_run_len(rtc: Dict[str, Any]) -> int:
"""Get simulated run time.
rtc = run time config
"""
limit = rtc['execution time limit']
speedup = rtc['simulation']['speedup factor']
if limit and speedup:
sleep_sec = (DurationParser().parse(
str(limit)).get_seconds() / speedup)
else:
sleep_sec = DurationParser().parse(
str(rtc['simulation']['default run length'])
).get_seconds()

return sleep_sec


def build_dummy_script(rtc: Dict[str, Any], sleep_sec: int) -> str:
"""Create fake scripting for dummy mode.
This is for Dummy mode only.
"""
script = "sleep %d" % sleep_sec
# Dummy message outputs.
for msg in rtc['outputs'].values():
script += "\ncylc message '%s'" % msg
if rtc['simulation']['fail try 1 only']:
arg1 = "true"
else:
arg1 = "false"
arg2 = " ".join(rtc['simulation']['fail cycle points'])
script += "\ncylc__job__dummy_result %s %s || exit 1" % (arg1, arg2)
return script


def disable_platforms(
rtc: Dict[str, Any]
) -> None:
"""Force platform = localhost
Remove legacy sections [job] and [remote], which would conflict
with setting platforms.
This can be simplified when support for the FORBIDDEN_WITH_PLATFORM
configurations is dropped.
"""
for section, keys in FORBIDDEN_WITH_PLATFORM.items():
if section in rtc:
for key in keys:
if key in rtc[section]:
rtc[section][key] = None
rtc['platform'] = 'localhost'


def parse_fail_cycle_points(
f_pts_orig: List[str]
) -> 'Union[None, List[PointBase]]':
"""Parse `[simulation][fail cycle points]`.
- None for "fail all points".
- Else a list of cycle point objects.
Examples:
>>> this = parse_fail_cycle_points
>>> this(['all']) is None
True
>>> this([])
[]
"""
f_pts: 'Optional[List[PointBase]]'
if 'all' in f_pts_orig:
f_pts = None
else:
f_pts = []
for point_str in f_pts_orig:
f_pts.append(get_point(point_str).standardise())
return f_pts


def sim_time_check(
message_queue: 'Queue[TaskMsg]', itasks: 'List[TaskProxy]'
) -> bool:
"""Check if sim tasks have been "running" for as long as required.
If they have change the task state.
Returns:
True if _any_ simulated task state has changed.
"""
sim_task_state_changed = False
now = time()
for itask in itasks:
if itask.state.status != TASK_STATUS_RUNNING:
continue
# Started time is not set on restart
if itask.summary['started_time'] is None:
itask.summary['started_time'] = now
timeout = (
itask.summary['started_time'] +
itask.tdef.rtconfig['simulation']['simulated run length']
)
if now > timeout:
job_d = itask.tokens.duplicate(job=str(itask.submit_num))
now_str = get_current_time_string()
if sim_task_failed(
itask.tdef.rtconfig['simulation'],
itask.point,
itask.get_try_num()
):
message_queue.put(
TaskMsg(job_d, now_str, 'CRITICAL', TASK_STATUS_FAILED)
)
else:
# Simulate message outputs.
for msg in itask.tdef.rtconfig['outputs'].values():
message_queue.put(
TaskMsg(job_d, now_str, 'DEBUG', msg)
)
message_queue.put(
TaskMsg(job_d, now_str, 'DEBUG', TASK_STATUS_SUCCEEDED)
)
sim_task_state_changed = True
return sim_task_state_changed


def sim_task_failed(
sim_conf: Dict[str, Any],
point: 'PointBase',
try_num: int,
) -> bool:
"""Encapsulate logic for deciding whether a sim task has failed.
Allows Unit testing.
"""
return (
sim_conf['fail cycle points'] is None # i.e. "all"
or point in sim_conf['fail cycle points']
) and (
try_num == 1 or not sim_conf['fail try 1 only']
)
Loading

0 comments on commit 8606f93

Please sign in to comment.