From 13413550ee66dd1921cc528a53fa8867114d34e8 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 25 Jun 2024 14:53:34 +0100 Subject: [PATCH] Log job failure even when it does not cause a change in task state. Added fuller message Added test --- changes.d/fix.6169.md | 1 + cylc/flow/task_events_mgr.py | 21 ++++-- tests/integration/test_task_events_mgr.py | 79 ++++++----------------- 3 files changed, 35 insertions(+), 66 deletions(-) create mode 100644 changes.d/fix.6169.md diff --git a/changes.d/fix.6169.md b/changes.d/fix.6169.md new file mode 100644 index 00000000000..d116ea1ba75 --- /dev/null +++ b/changes.d/fix.6169.md @@ -0,0 +1 @@ +Ensure that job failure is logged, even when the presence of retries causes the task not to change state. diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index bf9c2ba3a9b..07a7ffa67cb 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -742,7 +742,7 @@ def process_message( # Already failed. return True if self._process_message_failed( - itask, event_time, self.JOB_FAILED, forced + itask, event_time, self.JOB_FAILED, forced, message ): self.spawn_children(itask, TASK_OUTPUT_FAILED) @@ -790,7 +790,7 @@ def process_message( self.workflow_db_mgr.put_update_task_jobs( itask, {"run_signal": signal}) if self._process_message_failed( - itask, event_time, self.JOB_FAILED, forced + itask, event_time, self.JOB_FAILED, forced, message ): self.spawn_children(itask, TASK_OUTPUT_FAILED) @@ -807,7 +807,7 @@ def process_message( self.workflow_db_mgr.put_update_task_jobs( itask, {"run_signal": aborted_with}) if self._process_message_failed( - itask, event_time, aborted_with, forced + itask, event_time, aborted_with, forced, message ): self.spawn_children(itask, TASK_OUTPUT_FAILED) @@ -1292,10 +1292,17 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False): if itask.state_reset(TASK_STATUS_WAITING): self.data_store_mgr.delta_task_state(itask) - def _process_message_failed(self, itask, event_time, message, forced): + def _process_message_failed( + self, itask, event_time, message, forced, full_message + ): """Helper for process_message, handle a failed message. Return True if no retries (hence go to the failed state). + + Args: + full_message: + If we have retries lined up we still tell users what + happened to cause the this attempt to fail. """ no_retries = False if event_time is None: @@ -1325,9 +1332,11 @@ def _process_message_failed(self, itask, event_time, message, forced): else: # There is an execution retry lined up. timer = itask.try_timers[TimerFlags.EXECUTION_RETRY] - self._retry_task(itask, timer.timeout) delay_msg = f"retrying in {timer.delay_timeout_as_str()}" - LOG.warning(f"[{itask}] {delay_msg}") + LOG.warning( + f'[{itask}] => {TASK_OUTPUT_FAILED} with {full_message}' + f' \n({delay_msg})') + self._retry_task(itask, timer.timeout) msg = f"{self.JOB_FAILED}, {delay_msg}" self.setup_event_handlers(itask, self.EVENT_RETRY, msg) self._reset_job_timers(itask) diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py index 7ac12274d7b..f07476fed86 100644 --- a/tests/integration/test_task_events_mgr.py +++ b/tests/integration/test_task_events_mgr.py @@ -108,65 +108,24 @@ async def test__insert_task_job(flow, one_conf, scheduler, start, validate): ] == [1, 2] -async def test__always_insert_task_job( - flow, scheduler, mock_glbl_cfg, start, run -): - """Insert Task Job _Always_ inserts a task into the data store. - - Bug https://github.com/cylc/cylc-flow/issues/6172 was caused - by passing task state to data_store_mgr.insert_job: Where - a submission retry was in progress the task state would be - "waiting" which caused the data_store_mgr.insert_job - to return without adding the task to the data store. - This is testing two different cases: +async def test__process_message_failed_with_retry(one, start): + """Log job failure, even if a retry is scheduled. - * Could not select host from platform - * Could not select host from platform group - """ - global_config = """ - [platforms] - [[broken1]] - hosts = no-such-host-1 - [[broken2]] - hosts = no-such-host-2 - [platform groups] - [[broken]] - platforms = broken1 + See: https://github.com/cylc/cylc-flow/pull/6169 """ - mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', global_config) - - id_ = flow({ - 'scheduling': {'graph': {'R1': 'broken & broken2'}}, - 'runtime': { - 'root': {'submission retry delays': 'PT10M'}, - 'broken': {'platform': 'broken'}, - 'broken2': {'platform': 'broken2'} - } - }) - - schd = scheduler(id_, run_mode='live') - schd.bad_hosts = {'no-such-host-1', 'no-such-host-2'} - async with start(schd): - schd.task_job_mgr.submit_task_jobs( - schd.workflow, - schd.pool.get_tasks(), - schd.server.curve_auth, - schd.server.client_pub_key_dir, - is_simulation=False - ) - - # Both tasks are in a waiting state: - assert all( - i.state.status == TASK_STATUS_WAITING - for i in schd.pool.get_tasks()) - - # Both tasks have updated the data store with info - # about a failed job: - updates = { - k.split('//')[-1]: v.state - for k, v in schd.data_store_mgr.updated[JOBS].items() - } - assert updates == { - '1/broken/01': 'submit-failed', - '1/broken2/01': 'submit-failed' - } + async with start(one) as LOG: + fail_once = one.pool.get_tasks()[0] + + # Add retry timers: + one.task_job_mgr._set_retry_timers( + fail_once, { + 'execution retry delays': [1], + 'submission retry delays': [1] + }) + + # Process failed message: + one.task_events_mgr._process_message_failed( + fail_once, None, 'failed', False, 'failed/OOK') + + # Check that failure reported: + assert 'failed with failed/OOK' in LOG.messages[-1]