Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Isolate ICP and FCP graphs using the runahead limit #5036

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ ones in. -->

### Enhancements

[#5036](https://github.com/cylc/cylc-flow/pull/5036) - optional isolation of
Copy link
Member

@dwsutherland dwsutherland Aug 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capitalise "optional" (and 5032 below it)?

initial and final cycle point graphs, to simplify use of workflow preparation
and clean-up tasks.

[#5032](https://github.com/cylc/cylc-flow/pull/5032) - set a default limit of
100 for the "default" queue.


-------------------------------------------------------------------------------
## __cylc-8.0.0 (<span actions:bind='release-date'>Released 2022-07-28</span>)__

Expand Down
16 changes: 16 additions & 0 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,22 @@ def get_script_common_text(this: str, example: Optional[str] = None):
:ref:`Recurrence tutorial <tutorial-inferred-recurrence>`.

''')
Conf('isolate initial cycle point', VDR.V_BOOLEAN, default=False,
desc='''
Don't run any later-cycle tasks until the first cycle is finished.
If True, tasks in later cycles do not need to depend explitly on
initial-cycle workflow preparation tasks.

.. versionadded:: 8.0.0
''')
Conf('isolate final cycle point', VDR.V_BOOLEAN, default=False,
desc='''
Don't run any final-cycle tasks until all previous cycles have
finished. If True, final cycle clean-up tasks do not need to
depend explicitly on tasks in earlier cycles.

.. versionadded:: 8.0.0
''')
Conf('hold after cycle point', VDR.V_CYCLE_POINT, desc=f'''
Hold all tasks that pass this cycle point.

Expand Down
27 changes: 27 additions & 0 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ def __init__(
self.task_param_vars = {} # type: ignore # TODO figure out type
self.runahead_limit: Optional['IntervalBase'] = None

self.second_to_last_point = None

# runtime hierarchy dicts keyed by namespace name:
self.runtime: Dict[str, dict] = { # TODO figure out type
# lists of parent namespaces
Expand Down Expand Up @@ -462,6 +464,18 @@ def __init__(

self.process_runahead_limit()

if self.cfg['scheduling']['isolate final cycle point']:
if self.final_point is None:
LOG.warning(
"Ignoring 'isolate final cycle point = True'"
" (this workflow has no final cycle point)."
)
else:
self.second_to_last_point = (
self.get_second_to_last_point(
self.sequences, self.final_point)
)

if self.run_mode('simulation', 'dummy'):
self.configure_sim_modes()

Expand Down Expand Up @@ -767,6 +781,19 @@ def process_stop_cycle_point(self) -> None:
stopcp_str = str(self.stop_point) if self.stop_point else None
self.cfg['scheduling']['stop after cycle point'] = stopcp_str

@staticmethod
def get_second_to_last_point(
sequences: List['SequenceBase'],
final_point: 'PointBase'
) -> 'PointBase':
"""Check each sequence to find the second-to-last cycle point."""
prev_points = set()
for sequence in sequences:
seq_point = sequence.get_prev_point(final_point)
if seq_point is not None:
prev_points.add(seq_point)
return max(prev_points)

def _check_implicit_tasks(self) -> None:
"""Raise WorkflowConfigError if implicit tasks are found in graph or
queue config, unless allowed by config."""
Expand Down
29 changes: 24 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ def compute_runahead(self, force=False) -> bool:
if not points:
return False
base_point = min(points)

if self._prev_runahead_base_point is None:
self._prev_runahead_base_point = base_point

Expand Down Expand Up @@ -387,16 +386,36 @@ def compute_runahead(self, force=False) -> bool:
limit_point = sorted(points)[-1]

# Adjust for future offset and stop point, if necessary.
pre_adj_limit = limit_point
orig_limit = limit_point
if self.max_future_offset is not None:
limit_point += self.max_future_offset
LOG.debug(f"{pre_adj_limit} -> {limit_point} (future offset)")
LOG.debug(f"{orig_limit} -> {limit_point} (future offset)")
if self.stop_point and limit_point > self.stop_point:
limit_point = self.stop_point
LOG.debug(f"{pre_adj_limit} -> {limit_point} (stop point)")
LOG.info(f"Runahead limit: {limit_point}")
LOG.debug(f"{orig_limit} -> {limit_point} (stop point)")

orig_limit = limit_point
if (
self.config.cfg["scheduling"]["isolate initial cycle point"]
and base_point <= self.config.initial_point
and limit_point > self.config.initial_point
):
limit_point = self.config.initial_point
LOG.debug(
f"{orig_limit} -> {limit_point} (isolate initial point)")

orig_limit = limit_point
if (
self.config.second_to_last_point is not None
and base_point <= self.config.second_to_last_point
and limit_point > self.config.second_to_last_point
):
limit_point = self.config.second_to_last_point
LOG.debug(f"{orig_limit} -> {limit_point} (isolate final point)")

LOG.info(f"Runahead limit: {limit_point}")
self.runahead_limit_point = limit_point

return True

def update_flow_mgr(self):
Expand Down
2 changes: 2 additions & 0 deletions tests/functional/cylc-config/00-simple/section1.stdout
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ initial cycle point = 1
final cycle point = 1
initial cycle point constraints =
final cycle point constraints =
isolate initial cycle point = False
isolate final cycle point = False
hold after cycle point =
stop after cycle point =
cycling mode = integer
Expand Down
22 changes: 22 additions & 0 deletions tests/functional/isolate-icp-fcp/00-simple.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Test optinoal isolation of initial and final cycle point graphs.
. "$(dirname "$0")/test_header"
set_test_number 2
reftest
exit
30 changes: 30 additions & 0 deletions tests/functional/isolate-icp-fcp/00-simple/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Test optional isolation of initial and final cycle point graphs.
[scheduler]
[[events]]
stall timeout = PT0S
abort on stall timeout = True
[scheduling]
cycling mode = integer
final cycle point = 3
isolate initial cycle point = True
isolate final cycle point = True
[[graph]]
R1 = "prep => foo"
P1 = "foo"
R1/$ = "foo => clean"
[runtime]
[[prep]]
script = "sleep 5"
[[foo]]
script = """
# No foo instance should start before prep is finished.
grep -s "1/prep running .* succeeded" "${CYLC_WORKFLOW_LOG_DIR}/log"
if ((CYLC_TASK_CYCLE_POINT == 1)); then
sleep 5
fi
"""
[[clean]]
script = """
# Clean should not run before 1/foo is finished.
grep -s "1/foo running .* succeeded" "${CYLC_WORKFLOW_LOG_DIR}/log"
"""
5 changes: 5 additions & 0 deletions tests/functional/isolate-icp-fcp/00-simple/reference.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1/prep -triggered off [] in flow 1
1/foo -triggered off ['1/prep'] in flow 1
2/foo -triggered off [] in flow 1
3/foo -triggered off [] in flow 1
3/clean -triggered off ['3/foo'] in flow 1
1 change: 1 addition & 0 deletions tests/functional/isolate-icp-fcp/test_header
47 changes: 45 additions & 2 deletions tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from optparse import Values
from typing import Any, Callable, Dict, Optional, Tuple, Type
from typing import Any, Callable, Dict, Optional, Tuple, Type, List
from pathlib import Path
import pytest
import logging
Expand All @@ -24,8 +24,9 @@

from cylc.flow import CYLC_LOG
from cylc.flow.config import WorkflowConfig
from cylc.flow.cycling import loader
from cylc.flow.cycling import loader, PointBase, SequenceBase
from cylc.flow.cycling.loader import INTEGER_CYCLING_TYPE, ISO8601_CYCLING_TYPE
from cylc.flow.cycling.integer import IntegerSequence, IntegerPoint
from cylc.flow.exceptions import (
PointParsingError,
InputError,
Expand Down Expand Up @@ -1439,3 +1440,45 @@ def test_check_for_owner(runtime_cfg):
else:
# Assert function doesn't raise if no owner set:
assert WorkflowConfig.check_for_owner(runtime_cfg) is None


@pytest.mark.parametrize(
('sequences', 'final_point', 'expected_point'),
[
pytest.param(
[
IntegerSequence('R1', 1, 5), # 1
IntegerSequence('P1', 1, 5), # 1,2,3,4,5
IntegerSequence('R/P1!4', 1, 5) # 1,2,3,5
],
IntegerPoint(5),
IntegerPoint(4)
),
pytest.param(
[
IntegerSequence('R1', 1, 5), # 1
IntegerSequence('R/P1!4', 1, 5) # 1,2,3,5
],
IntegerPoint(5),
IntegerPoint(3)
),
pytest.param(
[
IntegerSequence('R//P2', 1, 5), # 1,3,5
IntegerSequence('R2/P1/5', 1, 5), # 4,5
],
IntegerPoint(5),
IntegerPoint(4)
)
]
)
def test_get_second_to_last_point(
sequences: List[SequenceBase],
final_point: PointBase,
expected_point: PointBase
) -> None:
assert (
WorkflowConfig.get_second_to_last_point(
sequences, final_point
) == expected_point
)