Skip to content

Commit

Permalink
Merge pull request #5706 from cylc/8.2.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 8.2.x-sync into master
  • Loading branch information
oliver-sanders authored Aug 29, 2023
2 parents 6120eda + d900bfc commit 801c1be
Show file tree
Hide file tree
Showing 14 changed files with 280 additions and 33 deletions.
3 changes: 3 additions & 0 deletions changes.d/5694.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Don't fail config file parsing if current working directory does not exist.
(Note however this may not be enough to prevent file parsing commands failing
elsewhere in the Python library).
1 change: 1 addition & 0 deletions changes.d/5704.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix off-by-one error in automatic upgrade of Cylc 7 "max active cycle points" to Cylc 8 "runahead limit".
1 change: 1 addition & 0 deletions changes.d/5708.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix runahead limit at start-up, with recurrences that start beyond the limit.
5 changes: 4 additions & 1 deletion cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1846,7 +1846,10 @@ def upg(cfg, descr):
'8.0.0',
['scheduling', 'max active cycle points'],
['scheduling', 'runahead limit'],
cvtr=converter(lambda x: f'P{x}' if x != '' else '', '"n" -> "Pn"'),
cvtr=converter(
lambda x: f'P{int(x)-1}' if x != '' else '',
'"{old}" -> "{new}"'
),
silent=cylc.flow.flags.cylc7_back_compat,
)
u.deprecate(
Expand Down
27 changes: 24 additions & 3 deletions cylc/flow/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ class SequenceBase(metaclass=ABCMeta):
They should also provide get_async_expr, get_interval,
get_offset & set_offset (deprecated), is_on_sequence,
get_nearest_prev_point, get_next_point,
get_next_point_on_sequence, get_first_point, and
get_stop_point.
get_next_point_on_sequence, get_first_point
get_start_point, and get_stop_point.
They should also provide a self.__eq__ implementation
which should return whether a SequenceBase-derived object
Expand Down Expand Up @@ -405,11 +405,32 @@ def get_first_point(self, point):
"""Return the first point >= to point, or None if out of bounds."""
pass

@abstractmethod
def get_start_point(self):
"""Return the first point of this sequence."""
pass

@abstractmethod
def get_stop_point(self):
"""Return the last point in this sequence, or None if unbounded."""
"""Return the last point of this sequence, or None if unbounded."""
pass

def get_first_n_points(self, n, point=None):
"""Return a list of first n points of this sequence."""
if point is None:
p1 = self.get_start_point()
else:
p1 = self.get_first_point(point)
if p1 is None:
return []
result = [p1]
for _ in range(1, n):
p1 = self.get_next_point_on_sequence(p1)
if p1 is None:
break
result.append(p1)
return result

@abstractmethod
def __eq__(self, other) -> bool:
# Return True if other (sequence) is equal to self.
Expand Down
13 changes: 11 additions & 2 deletions cylc/flow/parsec/fileparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,14 @@ def read_and_proc(
fpath = _get_fpath_for_source(fpath, opts)
fdir = os.path.dirname(fpath)

odir = os.getcwd()
try:
original_cwd = os.getcwd()
except FileNotFoundError:
# User's current working directory does not actually exist, so we won't
# be able to change back to it later. (Note this might not be enough to
# prevent file parsing commands failing due to missing cwd elsewhere in
# the Python library).
original_cwd = None

# Move to the file location to give the template processor easy access to
# other files in the workflow directory (whether source or installed).
Expand Down Expand Up @@ -520,7 +527,9 @@ def read_and_proc(
if do_contin:
flines = _concatenate(flines)

os.chdir(odir)
# If the user's original working directory exists, change back to it.
if original_cwd is not None:
os.chdir(original_cwd)

# return rstripped lines
return [fl.rstrip() for fl in flines]
Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/parsec/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ def upgrade(self):
if upg['new']:
msg += ' -> ' + self.show_keys(upg['new'],
upg['is_section'])
msg += " - " + upg['cvt'].describe()
msg += " - " + upg['cvt'].describe().format(
old=old,
new=upg['cvt'].convert(old)
)
if not upg['silent']:
warnings.setdefault(vn, [])
warnings[vn].append(msg)
Expand Down
78 changes: 54 additions & 24 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,54 @@ def compute_runahead(self, force=False) -> bool:
With force=True we recompute the limit even if the base point has not
changed (needed if max_future_offset changed, or on reload).
"""

limit = self.config.runahead_limit # e.g. P2 or P2D
count_cycles = False
with suppress(TypeError):
# Count cycles (integer cycling, and optional for datetime too).
ilimit = int(limit) # type: ignore
count_cycles = True

base_point: 'PointBase'
points: List['PointBase'] = []
sequence_points: Set['PointBase']

if not self.main_pool:
# Start at first point in each sequence, after the initial point.
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
# No tasks yet, just consider sequence points.
if count_cycles:
# Get the first ilimit points in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for plist in [
seq.get_first_n_points(
ilimit, self.config.start_point)
for seq in self.config.sequences
]
for point in plist
]
# Drop points beyond the limit.
points = sorted(points)[:ilimit + 1]
base_point = min(points)

else:
# Start at first point in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
base_point = min(points)
# Drop points beyond the limit.
points = [
point
for point in points
if point <= base_point + limit
]

else:
# Find the earliest point with unfinished tasks.
for point, itasks in sorted(self.get_tasks_by_point().items()):
Expand All @@ -344,9 +380,10 @@ def compute_runahead(self, force=False) -> bool:
)
):
points.append(point)
if not points:
return False
base_point = min(points)

if not points:
return False
base_point = min(points)

if self._prev_runahead_base_point is None:
self._prev_runahead_base_point = base_point
Expand All @@ -363,15 +400,8 @@ def compute_runahead(self, force=False) -> bool:
# change or the runahead limit is already at stop point.
return False

try:
limit = int(self.config.runahead_limit) # type: ignore
except TypeError:
count_cycles = False
limit = self.config.runahead_limit
else:
count_cycles = True

# Get all cycle points possible after the runahead base point.
# Get all cycle points possible after the base point.
sequence_points: Set['PointBase']
if (
not force
and self._prev_runahead_sequence_points
Expand All @@ -388,7 +418,7 @@ def compute_runahead(self, force=False) -> bool:
while seq_point is not None:
if count_cycles:
# P0 allows only the base cycle point to run.
if count > 1 + limit:
if count > 1 + ilimit:
break
else:
# PT0H allows only the base cycle point to run.
Expand All @@ -404,7 +434,7 @@ def compute_runahead(self, force=False) -> bool:

if count_cycles:
# Some sequences may have different intervals.
limit_point = sorted(points)[:(limit + 1)][-1]
limit_point = sorted(points)[:(ilimit + 1)][-1]
else:
# We already stopped at the runahead limit.
limit_point = sorted(points)[-1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ WARNING - * (8.0.0) [runtime][foo, cat, dog][events]mail to -> [runtime][foo, c
WARNING - * (8.0.0) [runtime][foo, cat, dog][events]mail from -> [runtime][foo, cat, dog][mail]from - value unchanged
WARNING - * (8.0.0) [cylc][events]mail smtp - DELETED (OBSOLETE) - use "global.cylc[scheduler][mail]smtp" instead
WARNING - * (8.0.0) [runtime][foo, cat, dog][events]mail smtp - DELETED (OBSOLETE) - use "global.cylc[scheduler][mail]smtp" instead
WARNING - * (8.0.0) [scheduling]max active cycle points -> [scheduling]runahead limit - "n" -> "Pn"
WARNING - * (8.0.0) [scheduling]max active cycle points -> [scheduling]runahead limit - "2" -> "P1"
WARNING - * (8.0.0) [scheduling]hold after point -> [scheduling]hold after cycle point - value unchanged
WARNING - * (8.0.0) [runtime][foo, cat, dog][suite state polling] -> [runtime][foo, cat, dog][workflow state polling] - value unchanged
WARNING - * (8.0.0) [runtime][foo, cat, dog][job]execution polling intervals -> [runtime][foo, cat, dog]execution polling intervals - value unchanged
Expand Down
44 changes: 44 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from cylc.flow import CYLC_LOG
from cylc.flow.cycling import PointBase
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.cycling.iso8601 import ISO8601Point
from cylc.flow.data_store_mgr import TASK_PROXIES
from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED
from cylc.flow.scheduler import Scheduler
Expand Down Expand Up @@ -62,6 +63,22 @@
}


EXAMPLE_FLOW_2_CFG = {
'scheduler': {
'allow implicit tasks': True,
'UTC mode': True
},
'scheduling': {
'initial cycle point': '2001',
'runahead limit': 'P3Y',
'graph': {
'P1Y': 'foo',
'R/2025/P1Y': 'foo => bar',
}
},
}


def get_task_ids(
name_point_list: Iterable[Tuple[str, Union[PointBase, str, int]]]
) -> List[str]:
Expand Down Expand Up @@ -129,6 +146,22 @@ async def example_flow(
yield schd


@pytest.fixture(scope='module')
async def mod_example_flow_2(
mod_flow: Callable, mod_scheduler: Callable, mod_run: Callable
) -> Scheduler:
"""Return a scheduler for interrogating its task pool.
This is module-scoped so faster than example_flow, but should only be used
where the test does not mutate the state of the scheduler or task pool.
"""
id_ = mod_flow(EXAMPLE_FLOW_2_CFG)
schd: Scheduler = mod_scheduler(id_, paused_start=True)
async with mod_run(schd):
pass
return schd


@pytest.mark.parametrize(
'items, expected_task_ids, expected_bad_items, expected_warnings',
[
Expand Down Expand Up @@ -1157,3 +1190,14 @@ async def test_task_proxy_remove_from_queues(

assert queues_after['default'] == ['1/hidden_control']
assert queues_after['queue_two'] == ['1/control']


async def test_runahead_offset_start(
mod_example_flow_2: Scheduler
) -> None:
"""Late-start recurrences should not break the runahead limit at start-up.
See GitHub #5708
"""
task_pool = mod_example_flow_2.pool
assert task_pool.runahead_limit_point == ISO8601Point('2004')
Loading

0 comments on commit 801c1be

Please sign in to comment.