Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log job failure even when there are retries configured #6169

Open
wants to merge 13 commits into
base: 8.4.x
Choose a base branch
from
1 change: 1 addition & 0 deletions changes.d/6169.fix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that job submit/failure is logged, even when retries are planned.
29 changes: 22 additions & 7 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ def process_message(
# Already failed.
return True
if self._process_message_failed(
itask, event_time, self.JOB_FAILED, forced
itask, event_time, self.JOB_FAILED, forced, message
wxtim marked this conversation as resolved.
Show resolved Hide resolved
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand Down Expand Up @@ -795,7 +795,7 @@ def process_message(
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": signal})
if self._process_message_failed(
itask, event_time, self.JOB_FAILED, forced
itask, event_time, self.JOB_FAILED, forced, message
):
wxtim marked this conversation as resolved.
Show resolved Hide resolved
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand All @@ -812,7 +812,7 @@ def process_message(
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": aborted_with})
if self._process_message_failed(
itask, event_time, aborted_with, forced
itask, event_time, aborted_with, forced, message
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)

Expand Down Expand Up @@ -928,11 +928,15 @@ def _process_message_check(
return False

severity_lvl: int = LOG_LEVELS.get(severity, INFO)
# Don't log submit/failure messages here:
if flag != self.FLAG_POLLED and message in {
self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
}:
return True
# Demote log level to DEBUG if this is a message that duplicates what
# gets logged by itask state change anyway (and not manual poll)
if severity_lvl > DEBUG and flag != self.FLAG_POLLED and message in {
self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED,
self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
}:
severity_lvl = DEBUG
LOG.log(severity_lvl, f"[{itask}] {flag}{message}{timestamp}")
Expand Down Expand Up @@ -1297,10 +1301,17 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False):
if itask.state_reset(TASK_STATUS_WAITING):
self.data_store_mgr.delta_task_state(itask)

def _process_message_failed(self, itask, event_time, message, forced):
def _process_message_failed(
self, itask, event_time, message, forced, full_message
):
"""Helper for process_message, handle a failed message.

Return True if no retries (hence go to the failed state).

Args:
full_message:
If we have retries lined up we still tell users what
happened to cause the this attempt to fail.
"""
no_retries = False
if event_time is None:
Expand All @@ -1313,6 +1324,7 @@ def _process_message_failed(self, itask, event_time, message, forced):
"run_status": 1,
"time_run_exit": event_time,
})
LOG.error(f'[{itask}] {full_message or self.EVENT_FAILED}')
if (
forced
or TimerFlags.EXECUTION_RETRY not in itask.try_timers
Expand All @@ -1332,7 +1344,10 @@ def _process_message_failed(self, itask, event_time, message, forced):
timer = itask.try_timers[TimerFlags.EXECUTION_RETRY]
self._retry_task(itask, timer.timeout)
delay_msg = f"retrying in {timer.delay_timeout_as_str()}"
LOG.warning(f"[{itask}] {delay_msg}")
LOG.warning(
f'[{itask}] {full_message or self.EVENT_FAILED} - '
f'{delay_msg}'
)
msg = f"{self.JOB_FAILED}, {delay_msg}"
self.setup_event_handlers(itask, self.EVENT_RETRY, msg)
self._reset_job_timers(itask)
Expand Down Expand Up @@ -1404,14 +1419,14 @@ def _process_message_submit_failed(
Return True if no retries (hence go to the submit-failed state).
"""
no_retries = False
LOG.critical(f"[{itask}] {self.EVENT_SUBMIT_FAILED}")
if event_time is None:
event_time = get_current_time_string()
self.workflow_db_mgr.put_update_task_jobs(itask, {
"time_submit_exit": event_time,
"submit_status": 1,
})
itask.summary['submit_method_id'] = None
LOG.error(f"[{itask}] {self.EVENT_SUBMIT_FAILED}")
if (
forced
or TimerFlags.SUBMISSION_RETRY not in itask.try_timers
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/cylc-remove/00-simple/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
script = false
[[cleaner]]
script = """
cylc__job__poll_grep_workflow_log -E '1/b/01:running.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/b/01.* failed'
# Remove the unhandled failed task
cylc remove "$CYLC_WORKFLOW_ID//1/b"
# Remove waiting 1/c
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/cylc-remove/02-cycling/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
[runtime]
[[remover]]
script = """
cylc__job__poll_grep_workflow_log -E '2020/bar/01:running.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '2021/baz/01:running.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '2020/bar/01.* failed'
cylc__job__poll_grep_workflow_log -E '2021/baz/01.* failed'
# Remove the two unhandled failed tasks.
cylc remove "$CYLC_WORKFLOW_ID//*/ba*:failed"
# Remove the two unsatisfied waiting tasks.
Expand Down
6 changes: 3 additions & 3 deletions tests/functional/cylc-trigger/02-filter-failed/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
[[fixer]]
script = """
cylc__job__wait_cylc_message_started
cylc__job__poll_grep_workflow_log -E '1/fixable1/01:running.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/fixable2/01:running.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/fixable3/01:running.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '\[1/fixable1/01:running\] failed/ERR'
cylc__job__poll_grep_workflow_log -E '\[1/fixable2/01:running\] failed/ERR'
cylc__job__poll_grep_workflow_log -E '\[1/fixable3/01:running\] failed/ERR'
cylc trigger "${CYLC_WORKFLOW_ID}//1/fixable*"
"""
[[Z]]
Expand Down
10 changes: 5 additions & 5 deletions tests/functional/cylc-trigger/04-filter-names/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
[[fixer]]
script = """
cylc__job__wait_cylc_message_started
cylc__job__poll_grep_workflow_log -E '1/fixable-1a/01.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-1b/01.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-2a/01.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-2b/01.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-3/01.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-1a/01.* failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-1b/01.* failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-2a/01.* failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-2b/01.* failed'
cylc__job__poll_grep_workflow_log -E '1/fixable-3/01.* failed'
cylc trigger "${CYLC_WORKFLOW_ID}//" \
'//1/FIXABLE-1' '//1/fixable-2*' '//1/fixable-3'
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/hold-release/11-retrying/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ t-retry-able => t-analyse
[[t-hold-release]]
script = """
cylc__job__poll_grep_workflow_log -E \
'1/t-retry-able/01:running.* \(received\)failed'
'\[1/t-retry-able:waiting\] failed/ERR'

cylc__job__poll_grep_workflow_log -E \
'1/t-retry-able/01:running.* => waiting'
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/reload/10-runahead.t
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ run_fail "${TEST_NAME}" cylc play --debug --no-detach "${WORKFLOW_NAME}"
#-------------------------------------------------------------------------------
TEST_NAME=${TEST_NAME_BASE}-check-fail
DB_FILE="$RUN_DIR/${WORKFLOW_NAME}/log/db"
QUERY='SELECT COUNT(*) FROM task_states WHERE status == "failed"'
QUERY="SELECT COUNT(*) FROM task_states WHERE status == 'failed'"
cmp_ok <(sqlite3 "$DB_FILE" "$QUERY") <<< "4"
#-------------------------------------------------------------------------------
purge
4 changes: 2 additions & 2 deletions tests/functional/reload/25-xtriggers.t
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
[[reload]]
script = """
# wait for "broken" to fail
cylc__job__poll_grep_workflow_log -E '1/broken/01.* \(received\)failed/ERR'
cylc__job__poll_grep_workflow_log -E '1/broken.*failed/ERR'
wxtim marked this conversation as resolved.
Show resolved Hide resolved
# fix "broken" to allow it to pass
sed -i 's/false/true/' "${CYLC_WORKFLOW_RUN_DIR}/flow.cylc"
# reload the workflow
Expand All @@ -63,7 +63,7 @@ workflow_run_ok "${TEST_NAME_BASE}-run" cylc play "${WORKFLOW_NAME}" --no-detach
log_scan "${TEST_NAME_BASE}-scan" \
"$(cylc cat-log -m p "${WORKFLOW_NAME}")" \
1 1 \
'1/broken.* (received)failed/ERR'
'1/broken.*failed/ERR'
wxtim marked this conversation as resolved.
Show resolved Hide resolved

log_scan "${TEST_NAME_BASE}-scan" \
"$(cylc cat-log -m p "${WORKFLOW_NAME}")" 1 1 \
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/reload/runahead/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
script = true
[[reloader]]
script = """
cylc__job__poll_grep_workflow_log -E "${CYLC_TASK_CYCLE_POINT}/foo/01:running.*\(received\)failed"
cylc__job__poll_grep_workflow_log -E "${CYLC_TASK_CYCLE_POINT}/foo/01:running.*failed"
perl -pi -e 's/(runahead limit = )P1( # marker)/\1 P3\2/' $CYLC_WORKFLOW_RUN_DIR/flow.cylc
cylc reload $CYLC_WORKFLOW_ID
"""
2 changes: 1 addition & 1 deletion tests/functional/spawn-on-demand/10-retrigger/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""
[[triggerer]]
script = """
cylc__job__poll_grep_workflow_log -E '1/oops/01:running.* \(received\)failed'
cylc__job__poll_grep_workflow_log -E '1/oops/01.* failed'
cylc trigger "${CYLC_WORKFLOW_ID}//1/oops"
"""
[[foo, bar]]
2 changes: 1 addition & 1 deletion tests/functional/triggering/19-and-suicide/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
[[t0]]
# https://github.com/cylc/cylc-flow/issues/2655
# "1/t2" should not suicide on "1/t1:failed"
script = cylc__job__poll_grep_workflow_log -E '1/t1.* \(received\)failed'
script = cylc__job__poll_grep_workflow_log -E '1/t1.* failed'
[[t1]]
script = false
[[t2]]
Expand Down
63 changes: 53 additions & 10 deletions tests/integration/test_task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from itertools import product
import logging
from typing import Any as Fixture

from cylc.flow.task_events_mgr import TaskJobLogsRetrieveContext
from cylc.flow.scheduler import Scheduler
from cylc.flow.data_store_mgr import (
JOBS,
TASK_STATUSES_ORDERED,
TASK_STATUS_WAITING,
TASK_STATUS_SUBMIT_FAILED,
)


Expand Down Expand Up @@ -79,17 +76,22 @@ async def test__insert_task_job(flow, one_conf, scheduler, start, validate):
with correct submit number.
"""
conf = {
'scheduling': {'graph': {'R1': 'rhenas'}},
'runtime': {'rhenas': {'simulation': {
'fail cycle points': '1',
'fail try 1 only': False,
}}}}
"scheduling": {"graph": {"R1": "rhenas"}},
"runtime": {
"rhenas": {
"simulation": {
"fail cycle points": "1",
"fail try 1 only": False,
}
}
},
}
id_ = flow(conf)
schd = scheduler(id_)
async with start(schd):
# Set task to running:
itask = schd.pool.get_tasks()[0]
itask.state.status = 'running'
itask = schd.pool.get_tasks()[0]
itask.state.status = "running"
itask.submit_num += 1

# Not run _insert_task_job yet:
Expand Down Expand Up @@ -170,3 +172,44 @@ async def test__always_insert_task_job(
'1/broken/01': 'submit-failed',
'1/broken2/01': 'submit-failed'
}


async def test__process_message_failed_with_retry(one, start, log_filter):
"""Log job failure, even if a retry is scheduled.

See: https://github.com/cylc/cylc-flow/pull/6169

"""

async with start(one) as LOG:
fail_once = one.pool.get_tasks()[0]
# Add retry timers:
one.task_job_mgr._set_retry_timers(
fail_once, {
'execution retry delays': [1],
'submission retry delays': [1]
})
Comment on lines +187 to +191
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Less fragile if this was just set in the workflow config for the test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it really make any difference? - the aim was to avoid fiddling with one. Can do if you insist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surely creating a workflow config with these retry delays set is not any more involved than fiddling the internal retry timers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not going to argue. Will change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went away and had a look at it - essentially this is more unit-testy than integration test-like: If you set these timers in the config you still need to run this function here to set the retry timers. So I think that I'll leave it.


# Process submit failed message with and without retries:
one.task_events_mgr._process_message_submit_failed(
fail_once, None, 1, False)
last_record = LOG.records[-1]
assert last_record.levelno == logging.WARNING
assert '1/one:waiting(queued)' in last_record.message
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved

one.task_events_mgr._process_message_submit_failed(
fail_once, None, 2, False)
failed_record = log_filter(LOG, level=logging.ERROR)[-1]
assert 'submission failed' in failed_record[2]

# Process failed message with and without retries:
one.task_events_mgr._process_message_failed(
fail_once, None, 'failed', False, 'failed/OOK')
last_record = LOG.records[-1]
assert last_record.levelno == logging.WARNING
assert 'failed/OOK' in last_record.message

one.task_events_mgr._process_message_failed(
fail_once, None, 'failed', False, 'failed/OOK')
failed_record = log_filter(LOG, level=logging.ERROR)[-1]
assert 'failed/OOK' in failed_record[2]
Loading