Skip to content

Commit

Permalink
Move handling of exiting exceptions to MPGraphExecutor (DM-45724)
Browse files Browse the repository at this point in the history
This avoids unexpected process exits from SingleGraphExecutors level.
MPGraphExecutor is a better place to handle application-level logic,
potentially CmdLineFwk would be even better location, but some context
is missing at that level.
  • Loading branch information
andy-slac committed Aug 14, 2024
1 parent ac6ec8c commit 0da26fd
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 32 deletions.
2 changes: 0 additions & 2 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,6 @@ def runPipeline(
skipExistingIn=args.skip_existing_in,
clobberOutputs=args.clobber_outputs,
enableLsstDebug=args.enableLsstDebug,
exitOnKnownError=args.fail_fast,
resources=resources,
)

Expand Down Expand Up @@ -991,7 +990,6 @@ def runGraphQBB(self, task_factory: TaskFactory, args: SimpleNamespace) -> None:
butler=None,
taskFactory=task_factory,
enableLsstDebug=args.enableLsstDebug,
exitOnKnownError=args.fail_fast,
limited_butler_factory=_butler_factory,
resources=resources,
assumeNoExistingOutputs=True,
Expand Down
68 changes: 60 additions & 8 deletions python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from typing import Literal

from lsst.daf.butler.cli.cliLog import CliLog
from lsst.pipe.base import InvalidQuantumError
from lsst.pipe.base import InvalidQuantumError, RepeatableQuantumError
from lsst.pipe.base.graph.graph import QuantumGraph, QuantumNode
from lsst.pipe.base.pipeline_graph import TaskNode
from lsst.utils.threads import disable_implicit_threading
Expand Down Expand Up @@ -74,8 +74,9 @@ class _Job:
Quantum and some associated information.
"""

def __init__(self, qnode: QuantumNode):
def __init__(self, qnode: QuantumNode, fail_fast: bool = False):
self.qnode = qnode
self._fail_fast = fail_fast
self.process: multiprocessing.process.BaseProcess | None = None
self._state = JobState.PENDING
self.started: float = 0.0
Expand Down Expand Up @@ -122,7 +123,7 @@ def start(
mp_ctx = multiprocessing.get_context(startMethod)
self.process = mp_ctx.Process( # type: ignore[attr-defined]
target=_Job._executeJob,
args=(quantumExecutor, task_node, quantum_pickle, logConfigState, snd_conn),
args=(quantumExecutor, task_node, quantum_pickle, logConfigState, snd_conn, self._fail_fast),
name=f"task-{self.qnode.quantum.dataId}",
)
# mypy is getting confused by multiprocessing.
Expand All @@ -138,6 +139,7 @@ def _executeJob(
quantum_pickle: bytes,
logConfigState: list,
snd_conn: multiprocessing.connection.Connection,
fail_fast: bool,
) -> None:
"""Execute a job with arguments.
Expand Down Expand Up @@ -168,8 +170,33 @@ def _executeJob(

quantum = pickle.loads(quantum_pickle)
report: QuantumReport | None = None
# Catch a few known failure modes and stop the process immediately,
# with exception-specific exit code.
try:
quantum, report = quantumExecutor.execute(task_node, quantum)
_, report = quantumExecutor.execute(task_node, quantum)
except RepeatableQuantumError as exc:
report = QuantumReport.from_exception(
exception=exc,
dataId=quantum.dataId,
taskLabel=task_node.label,
exitCode=exc.EXIT_CODE if fail_fast else None,
)
if fail_fast:
_LOG.warning("Caught repeatable quantum error for %s (%s):", task_node.label, quantum.dataId)
_LOG.warning(exc, exc_info=True)
sys.exit(exc.EXIT_CODE)
else:
raise
except InvalidQuantumError as exc:
_LOG.fatal("Invalid quantum error for %s (%s): %s", task_node.label, quantum.dataId)
_LOG.fatal(exc, exc_info=True)
report = QuantumReport.from_exception(
exception=exc,
dataId=quantum.dataId,
taskLabel=task_node.label,
exitCode=exc.EXIT_CODE,
)
sys.exit(exc.EXIT_CODE)
except Exception as exc:
_LOG.debug("exception from task %s dataId %s: %s", task_node.label, quantum.dataId, exc)
report = QuantumReport.from_exception(
Expand Down Expand Up @@ -490,11 +517,31 @@ def _executeQuantaInProcess(self, graph: QuantumGraph, report: Report) -> None:
continue

_LOG.debug("Executing %s", qnode)
fail_exit_code: int | None = None
try:
_, quantum_report = self.quantumExecutor.execute(task_node, qnode.quantum)
if quantum_report:
report.quantaReports.append(quantum_report)
successCount += 1
# For some exception types we want to exit immediately with
# exception-specific exit code, but we still want to start
# debugger before exiting if debugging is enabled.
try:
_, quantum_report = self.quantumExecutor.execute(task_node, qnode.quantum)
if quantum_report:
report.quantaReports.append(quantum_report)
successCount += 1
except RepeatableQuantumError as exc:
if self.failFast:
_LOG.warning(
"Caught repeatable quantum error for %s (%s):",
task_node.label,
qnode.quantum.dataId,
)
_LOG.warning(exc, exc_info=True)
fail_exit_code = exc.EXIT_CODE
raise
except InvalidQuantumError as exc:
_LOG.fatal("Invalid quantum error for %s (%s): %s", task_node.label, qnode.quantum.dataId)
_LOG.fatal(exc, exc_info=True)
fail_exit_code = exc.EXIT_CODE
raise
except Exception as exc:
quantum_report = QuantumReport.from_exception(
exception=exc,
Expand Down Expand Up @@ -523,6 +570,11 @@ def _executeQuantaInProcess(self, graph: QuantumGraph, report: Report) -> None:
pdb.post_mortem(exc.__traceback__)
failedNodes.add(qnode)
report.status = ExecutionStatus.FAILURE

# If exception specified an exit code then just exit with that
# code, otherwise crash if fail-fast option is enabled.
if fail_exit_code is not None:
sys.exit(fail_exit_code)
if self.failFast:
raise MPGraphExecutorError(
f"Task <{task_node.label} dataId={qnode.quantum.dataId}> failed."
Expand Down
6 changes: 6 additions & 0 deletions python/lsst/ctrl/mpexec/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ def from_exception(
exception: Exception,
dataId: DataId,
taskLabel: str,
*,
exitCode: int | None = None,
) -> QuantumReport:
"""Construct report instance from an exception and other pieces of
data.
Expand All @@ -158,11 +160,15 @@ def from_exception(
Data ID of quantum.
taskLabel : `str`
Label of task.
exitCode : `int`, optional
Exit code for the process, used when it is known that the process
will exit with that exit code.
"""
return cls(
status=ExecutionStatus.FAILURE,
dataId=dataId,
taskLabel=taskLabel,
exitCode=exitCode,
exceptionInfo=ExceptionInfo.from_exception(exception),
)

Expand Down
23 changes: 1 addition & 22 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
# Imports of standard modules --
# -------------------------------
import logging
import sys
import time
from collections import defaultdict
from collections.abc import Callable
Expand All @@ -56,7 +55,6 @@
NoWorkFound,
PipelineTask,
QuantumContext,
RepeatableQuantumError,
TaskFactory,
)
from lsst.pipe.base.pipeline_graph import TaskNode
Expand Down Expand Up @@ -104,11 +102,6 @@ class SingleQuantumExecutor(QuantumExecutor):
a quantum will be removed. Only used when ``butler`` is not `None`.
enableLsstDebug : `bool`, optional
Enable debugging with ``lsstDebug`` facility for a task.
exitOnKnownError : `bool`, optional
If `True`, call `sys.exit` with the appropriate exit code for special
known exceptions, after printing a traceback, instead of letting the
exception propagate up to calling. This is always the behavior for
InvalidQuantumError.
limited_butler_factory : `Callable`, optional
A method that creates a `~lsst.daf.butler.LimitedButler` instance
for a given Quantum. This parameter must be defined if ``butler`` is
Expand All @@ -135,7 +128,6 @@ def __init__(
skipExistingIn: Any = None,
clobberOutputs: bool = False,
enableLsstDebug: bool = False,
exitOnKnownError: bool = False,
limited_butler_factory: Callable[[Quantum], LimitedButler] | None = None,
resources: ExecutionResources | None = None,
skipExisting: bool = False,
Expand All @@ -145,7 +137,6 @@ def __init__(
self.taskFactory = taskFactory
self.enableLsstDebug = enableLsstDebug
self.clobberOutputs = clobberOutputs
self.exitOnKnownError = exitOnKnownError
self.limited_butler_factory = limited_butler_factory
self.resources = resources
self.assumeNoExistingOutputs = assumeNoExistingOutputs
Expand Down Expand Up @@ -480,25 +471,13 @@ def runQuantum(
# Get the input and output references for the task
inputRefs, outputRefs = task_node.get_connections().buildDatasetRefs(quantum)

# Call task runQuantum() method. Catch a few known failure modes and
# translate them into specific
# Call task runQuantum() method.
try:
task.runQuantum(butlerQC, inputRefs, outputRefs)
except NoWorkFound as err:
# Not an error, just an early exit.
_LOG.info("Task '%s' on quantum %s exited early: %s", task_node.label, quantum.dataId, str(err))
pass
except RepeatableQuantumError as err:
if self.exitOnKnownError:
_LOG.warning("Caught repeatable quantum error for %s (%s):", task_node.label, quantum.dataId)
_LOG.warning(err, exc_info=True)
sys.exit(err.EXIT_CODE)
else:
raise
except InvalidQuantumError as err:
_LOG.fatal("Invalid quantum error for %s (%s): %s", task_node.label, quantum.dataId)
_LOG.fatal(err, exc_info=True)
sys.exit(err.EXIT_CODE)

def writeMetadata(
self, quantum: Quantum, metadata: Any, task_node: TaskNode, /, limited_butler: LimitedButler
Expand Down

0 comments on commit 0da26fd

Please sign in to comment.