Skip to content

Commit

Permalink
Log job failure even when it doesn't cause a change in task state
Browse files Browse the repository at this point in the history
(i.e. when there is a retry set up).

Add newline to changelog entry
  • Loading branch information
Tim Pillinger authored and wxtim committed Oct 9, 2024
1 parent e7742b1 commit ee0770f
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
1 change: 1 addition & 0 deletions changes.d/6169.fix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that job failure is logged, even when the presence of retries causes the task not to change state.
20 changes: 15 additions & 5 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ def process_message(
# Already failed.
return True
if self._process_message_failed(
itask, event_time, self.JOB_FAILED, forced
itask, event_time, self.JOB_FAILED, forced, message
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand Down Expand Up @@ -795,7 +795,7 @@ def process_message(
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": signal})
if self._process_message_failed(
itask, event_time, self.JOB_FAILED, forced
itask, event_time, self.JOB_FAILED, forced, message
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand All @@ -812,7 +812,7 @@ def process_message(
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": aborted_with})
if self._process_message_failed(
itask, event_time, aborted_with, forced
itask, event_time, aborted_with, forced, message
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand Down Expand Up @@ -1297,10 +1297,17 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False):
if itask.state_reset(TASK_STATUS_WAITING):
self.data_store_mgr.delta_task_state(itask)

def _process_message_failed(self, itask, event_time, message, forced):
def _process_message_failed(
self, itask, event_time, message, forced, full_message
):
"""Helper for process_message, handle a failed message.
Return True if no retries (hence go to the failed state).
Args:
full_message:
If we have retries lined up we still tell users what
happened to cause the this attempt to fail.
"""
no_retries = False
if event_time is None:
Expand Down Expand Up @@ -1332,7 +1339,10 @@ def _process_message_failed(self, itask, event_time, message, forced):
timer = itask.try_timers[TimerFlags.EXECUTION_RETRY]
self._retry_task(itask, timer.timeout)
delay_msg = f"retrying in {timer.delay_timeout_as_str()}"
LOG.warning(f"[{itask}] {delay_msg}")
LOG.warning(
f'[{itask}] {full_message or self.EVENT_FAILED} - '
f'{delay_msg}'
)
msg = f"{self.JOB_FAILED}, {delay_msg}"
self.setup_event_handlers(itask, self.EVENT_RETRY, msg)
self._reset_job_timers(itask)
Expand Down
27 changes: 24 additions & 3 deletions tests/integration/test_task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from itertools import product
import logging
from typing import Any as Fixture

from cylc.flow.task_events_mgr import TaskJobLogsRetrieveContext
from cylc.flow.scheduler import Scheduler
from cylc.flow.data_store_mgr import (
JOBS,
TASK_STATUSES_ORDERED,
TASK_STATUS_WAITING,
TASK_STATUS_SUBMIT_FAILED,
)


Expand Down Expand Up @@ -170,3 +167,27 @@ async def test__always_insert_task_job(
'1/broken/01': 'submit-failed',
'1/broken2/01': 'submit-failed'
}


async def test__process_message_failed_with_retry(one, start):
"""Log job failure, even if a retry is scheduled.
See: https://github.com/cylc/cylc-flow/pull/6169
"""

async with start(one) as LOG:
fail_once = one.pool.get_tasks()[0]
# Add retry timers:
one.task_job_mgr._set_retry_timers(
fail_once, {
'execution retry delays': [1],
'submission retry delays': [1]
})

# Process failed message:
one.task_events_mgr._process_message_failed(
fail_once, None, 'failed', False, 'failed/OOK')

# Check that failure reported:
assert 'failed/OOK' in LOG.messages[-1]

0 comments on commit ee0770f

Please sign in to comment.