Skip to content

Commit

Permalink
Merge pull request #5959 from oliver-sanders/test-workflow-events
Browse files Browse the repository at this point in the history
workflow events: fix an issue where "timeout" events would not fire
  • Loading branch information
oliver-sanders authored Mar 25, 2024
2 parents 2678b53 + eb76ce6 commit ce25a81
Show file tree
Hide file tree
Showing 4 changed files with 303 additions and 115 deletions.
1 change: 1 addition & 0 deletions changes.d/5959.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix an issue where workflow "timeout" events were not fired in all situations when they should have been.
227 changes: 113 additions & 114 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,8 @@ async def run_scheduler(self) -> None:
# Non-async sleep - yield to other threads rather than event loop
sleep(0)
self.profiler.start()
await self.main_loop()
while True: # MAIN LOOP
await self._main_loop()

except SchedulerStop as exc:
# deliberate stop
Expand Down Expand Up @@ -1697,142 +1698,141 @@ def update_profiler_logs(self, tinit):
self.count, get_current_time_string()))
self.count += 1

async def main_loop(self) -> None:
"""The scheduler main loop."""
while True: # MAIN LOOP
tinit = time()
async def _main_loop(self) -> None:
"""A single iteration of the main loop."""
tinit = time()

# Useful for debugging core scheduler issues:
# self.pool.log_task_pool(logging.CRITICAL)
if self.incomplete_ri_map:
self.manage_remote_init()
# Useful for debugging core scheduler issues:
# self.pool.log_task_pool(logging.CRITICAL)
if self.incomplete_ri_map:
self.manage_remote_init()

Check warning on line 1708 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L1708

Added line #L1708 was not covered by tests

await self.process_command_queue()
self.proc_pool.process()
await self.process_command_queue()
self.proc_pool.process()

# Unqueued tasks with satisfied prerequisites must be waiting on
# xtriggers or ext_triggers. Check these and queue tasks if ready.
for itask in self.pool.get_tasks():
if (
not itask.state(TASK_STATUS_WAITING)
or itask.state.is_queued
or itask.state.is_runahead
):
continue
# Unqueued tasks with satisfied prerequisites must be waiting on
# xtriggers or ext_triggers. Check these and queue tasks if ready.
for itask in self.pool.get_tasks():
if (
not itask.state(TASK_STATUS_WAITING)
or itask.state.is_queued
or itask.state.is_runahead
):
continue

if (
itask.state.xtriggers
and not itask.state.xtriggers_all_satisfied()
):
self.xtrigger_mgr.call_xtriggers_async(itask)
if (
itask.state.xtriggers
and not itask.state.xtriggers_all_satisfied()
):
self.xtrigger_mgr.call_xtriggers_async(itask)

if (
itask.state.external_triggers
and not itask.state.external_triggers_all_satisfied()
):
self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)
if (
itask.state.external_triggers
and not itask.state.external_triggers_all_satisfied()
):
self.broadcast_mgr.check_ext_triggers(

Check warning on line 1733 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L1733

Added line #L1733 was not covered by tests
itask, self.ext_trigger_queue)

if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)
if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)

if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.set_expired_tasks()
self.release_queued_tasks()
self.pool.set_expired_tasks()
self.release_queued_tasks()

if self.pool.sim_time_check(self.message_queue):
# A simulated task state change occurred.
self.reset_inactivity_timer()
if self.pool.sim_time_check(self.message_queue):
# A simulated task state change occurred.
self.reset_inactivity_timer()

self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
self.late_tasks_check()
self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
self.late_tasks_check()

self.process_queued_task_messages()
await self.process_command_queue()
self.task_events_mgr.process_events(self)
self.process_queued_task_messages()
await self.process_command_queue()
self.task_events_mgr.process_events(self)

# Update state summary, database, and uifeed
self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr)
# Update state summary, database, and uifeed
self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr)

# List of task whose states have changed.
updated_task_list = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = updated_task_list or self.is_updated
# List of task whose states have changed.
updated_task_list = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = updated_task_list or self.is_updated

if updated_task_list and self.is_restart_timeout_wait:
# Stop restart timeout if action has been triggered.
with suppress(KeyError):
self.timers[self.EVENT_RESTART_TIMEOUT].stop()
self.is_restart_timeout_wait = False

if has_updated or self.data_store_mgr.updates_pending:
# Update the datastore.
await self.update_data_structure()
if updated_task_list and self.is_restart_timeout_wait:
# Stop restart timeout if action has been triggered.
with suppress(KeyError):
self.timers[self.EVENT_RESTART_TIMEOUT].stop()
self.is_restart_timeout_wait = False

if has_updated:
if not self.is_reloaded:
# (A reload cannot un-stall workflow by itself)
self.is_stalled = False
self.is_reloaded = False
if has_updated or self.data_store_mgr.updates_pending:
# Update the datastore.
await self.update_data_structure()

# Reset workflow and task updated flags.
self.is_updated = False
for itask in updated_task_list:
itask.state.is_updated = False
if has_updated:
if not self.is_reloaded:
# (A reload cannot un-stall workflow by itself)
self.is_stalled = False
self.is_reloaded = False

if not self.is_stalled:
# Stop the stalled timer.
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()
# Reset workflow and task updated flags.
self.is_updated = False
for itask in updated_task_list:
itask.state.is_updated = False

self.process_workflow_db_queue()
if not self.is_stalled:
# Stop the stalled timer.
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()

# If public database is stuck, blast it away by copying the content
# of the private database into it.
self.database_health_check()
self.process_workflow_db_queue()

# Shutdown workflow if timeouts have occurred
self.timeout_check()
# If public database is stuck, blast it away by copying the content
# of the private database into it.
self.database_health_check()

# Does the workflow need to shutdown on task failure?
await self.workflow_shutdown()
# Shutdown workflow if timeouts have occurred
self.timeout_check()

if self.options.profile_mode:
self.update_profiler_logs(tinit)
# Does the workflow need to shutdown on task failure?
await self.workflow_shutdown()

# Run plugin functions
await asyncio.gather(
*main_loop.get_runners(
self.main_loop_plugins,
main_loop.CoroTypes.Periodic,
self
)
if self.options.profile_mode:
self.update_profiler_logs(tinit)

Check warning on line 1803 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L1803

Added line #L1803 was not covered by tests

# Run plugin functions
await asyncio.gather(
*main_loop.get_runners(
self.main_loop_plugins,
main_loop.CoroTypes.Periodic,
self
)
)

if not has_updated and not self.stop_mode:
# Has the workflow stalled?
self.check_workflow_stalled()

# Sleep a bit for things to catch up.
# Quick sleep if there are items pending in process pool.
# (Should probably use quick sleep logic for other queues?)
elapsed = time() - tinit
quick_mode = self.proc_pool.is_not_done()
if (elapsed >= self.INTERVAL_MAIN_LOOP or
quick_mode and elapsed >= self.INTERVAL_MAIN_LOOP_QUICK):
# Main loop has taken quite a bit to get through
# Still yield control to other threads by sleep(0.0)
duration: float = 0
elif quick_mode:
duration = self.INTERVAL_MAIN_LOOP_QUICK - elapsed
else:
duration = self.INTERVAL_MAIN_LOOP - elapsed
await asyncio.sleep(duration)
# Record latest main loop interval
self.main_loop_intervals.append(time() - tinit)
# END MAIN LOOP
if not has_updated and not self.stop_mode:
# Has the workflow stalled?
self.check_workflow_stalled()

# Sleep a bit for things to catch up.
# Quick sleep if there are items pending in process pool.
# (Should probably use quick sleep logic for other queues?)
elapsed = time() - tinit
quick_mode = self.proc_pool.is_not_done()
if (elapsed >= self.INTERVAL_MAIN_LOOP or
quick_mode and elapsed >= self.INTERVAL_MAIN_LOOP_QUICK):
# Main loop has taken quite a bit to get through
# Still yield control to other threads by sleep(0.0)
duration: float = 0

Check warning on line 1827 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L1827

Added line #L1827 was not covered by tests
elif quick_mode:
duration = self.INTERVAL_MAIN_LOOP_QUICK - elapsed
else:
duration = self.INTERVAL_MAIN_LOOP - elapsed
await asyncio.sleep(duration)
# Record latest main loop interval
self.main_loop_intervals.append(time() - tinit)
# END MAIN LOOP

def _update_workflow_state(self):
"""Update workflow state in the data store and push out any deltas.
Expand Down Expand Up @@ -1871,12 +1871,11 @@ def check_workflow_timers(self):
for event, timer in self.timers.items():
if not timer.timed_out():
continue
self.run_event_handlers(event)
abort_conf = f"abort on {event}"
if self._get_events_conf(abort_conf):
# "cylc play" needs to exit with error status here.
raise SchedulerError(f'"{abort_conf}" is set')
if self._get_events_conf(f"{event} handlers") is not None:
self.run_event_handlers(event)
if event == self.EVENT_RESTART_TIMEOUT:
# Unset wait flag to allow normal shutdown.
self.is_restart_timeout_wait = False
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class MyException(Exception):
def killer():
raise MyException('mess')

one.main_loop = killer
one._main_loop = killer

# make sure that this error causes the flow to shutdown
with pytest.raises(MyException):
Expand Down
Loading

0 comments on commit ce25a81

Please sign in to comment.