diff --git a/changes.d/6169.fix b/changes.d/6169.fix new file mode 100644 index 00000000000..d116ea1ba75 --- /dev/null +++ b/changes.d/6169.fix @@ -0,0 +1 @@ +Ensure that job failure is logged, even when the presence of retries causes the task not to change state. diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 0a65baea1a9..7c3567448ef 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -744,7 +744,7 @@ def process_message( # Already failed. return True if self._process_message_failed( - itask, event_time, self.JOB_FAILED, forced + itask, event_time, self.JOB_FAILED, forced, message ): self.spawn_children(itask, TASK_OUTPUT_FAILED) @@ -795,7 +795,7 @@ def process_message( self.workflow_db_mgr.put_update_task_jobs( itask, {"run_signal": signal}) if self._process_message_failed( - itask, event_time, self.JOB_FAILED, forced + itask, event_time, self.JOB_FAILED, forced, message ): self.spawn_children(itask, TASK_OUTPUT_FAILED) @@ -812,7 +812,7 @@ def process_message( self.workflow_db_mgr.put_update_task_jobs( itask, {"run_signal": aborted_with}) if self._process_message_failed( - itask, event_time, aborted_with, forced + itask, event_time, aborted_with, forced, message ): self.spawn_children(itask, TASK_OUTPUT_FAILED) @@ -1297,10 +1297,17 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False): if itask.state_reset(TASK_STATUS_WAITING): self.data_store_mgr.delta_task_state(itask) - def _process_message_failed(self, itask, event_time, message, forced): + def _process_message_failed( + self, itask, event_time, message, forced, full_message + ): """Helper for process_message, handle a failed message. Return True if no retries (hence go to the failed state). + + Args: + full_message: + If we have retries lined up we still tell users what + happened to cause the this attempt to fail. """ no_retries = False if event_time is None: @@ -1332,7 +1339,10 @@ def _process_message_failed(self, itask, event_time, message, forced): timer = itask.try_timers[TimerFlags.EXECUTION_RETRY] self._retry_task(itask, timer.timeout) delay_msg = f"retrying in {timer.delay_timeout_as_str()}" - LOG.warning(f"[{itask}] {delay_msg}") + LOG.warning( + f'[{itask}] {full_message or self.EVENT_FAILED} - ' + f'{delay_msg}' + ) msg = f"{self.JOB_FAILED}, {delay_msg}" self.setup_event_handlers(itask, self.EVENT_RETRY, msg) self._reset_job_timers(itask) diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py index 7ac12274d7b..6e996bced6d 100644 --- a/tests/integration/test_task_events_mgr.py +++ b/tests/integration/test_task_events_mgr.py @@ -14,7 +14,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from itertools import product import logging from typing import Any as Fixture @@ -22,9 +21,7 @@ from cylc.flow.scheduler import Scheduler from cylc.flow.data_store_mgr import ( JOBS, - TASK_STATUSES_ORDERED, TASK_STATUS_WAITING, - TASK_STATUS_SUBMIT_FAILED, ) @@ -170,3 +167,27 @@ async def test__always_insert_task_job( '1/broken/01': 'submit-failed', '1/broken2/01': 'submit-failed' } + + +async def test__process_message_failed_with_retry(one, start): + """Log job failure, even if a retry is scheduled. + + See: https://github.com/cylc/cylc-flow/pull/6169 + + """ + + async with start(one) as LOG: + fail_once = one.pool.get_tasks()[0] + # Add retry timers: + one.task_job_mgr._set_retry_timers( + fail_once, { + 'execution retry delays': [1], + 'submission retry delays': [1] + }) + + # Process failed message: + one.task_events_mgr._process_message_failed( + fail_once, None, 'failed', False, 'failed/OOK') + + # Check that failure reported: + assert 'failed/OOK' in LOG.messages[-1]