Skip to content

Commit

Permalink
Improve exception handling in quantum executors (DM-45340)
Browse files Browse the repository at this point in the history
This should solve an issue with exceptions happening at unpredictable
locations in a SingleQuantumExecutor class. MPGraphExecutor in a single-process
mode should also behave reasonably for random exceptions. Multi-process mode
has more complex internal state. If a client of the code catches an exception
and tries to reuse the executor, the state may be inconsistent, depending
on where the exception happens.
  • Loading branch information
andy-slac committed Jul 24, 2024
1 parent efd0d94 commit b0c05c4
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 163 deletions.
27 changes: 20 additions & 7 deletions python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,19 @@ def _executeJob(

quantum = pickle.loads(quantum_pickle)
try:
quantumExecutor.execute(task_node, quantum)
_, report = quantumExecutor.execute(task_node, quantum)
except Exception as exc:
_LOG.debug("exception from task %s dataId %s: %s", task_node.label, quantum.dataId, exc)
report = QuantumReport.from_exception(
exception=exc,
dataId=quantum.dataId,
taskLabel=task_node.label,
)
raise
finally:
# If sending fails we do not want this new exception to be exposed.
try:
report = quantumExecutor.getReport()
_LOG.debug("sending report for task %s dataId %s", task_node.label, quantum.dataId)
snd_conn.send(report)
except Exception:
pass
Expand Down Expand Up @@ -480,9 +488,18 @@ def _executeQuantaInProcess(self, graph: QuantumGraph, report: Report) -> None:

_LOG.debug("Executing %s", qnode)
try:
self.quantumExecutor.execute(task_node, qnode.quantum)
_, quantum_report = self.quantumExecutor.execute(task_node, qnode.quantum)
if quantum_report:
report.quantaReports.append(quantum_report)
successCount += 1
except Exception as exc:
quantum_report = QuantumReport.from_exception(
exception=exc,
dataId=qnode.quantum.dataId,
taskLabel=task_node.label,
)
report.quantaReports.append(quantum_report)

if self.pdb and sys.stdin.isatty() and sys.stdout.isatty():
_LOG.error(
"Task <%s dataId=%s> failed; dropping into pdb.",
Expand Down Expand Up @@ -522,10 +539,6 @@ def _executeQuantaInProcess(self, graph: QuantumGraph, report: Report) -> None:
# times, run a collection loop here explicitly.
gc.collect()

quantum_report = self.quantumExecutor.getReport()
if quantum_report:
report.quantaReports.append(quantum_report)

_LOG.info(
"Executed %d quanta successfully, %d failed and %d remain out of total %d quanta.",
successCount,
Expand Down
25 changes: 7 additions & 18 deletions python/lsst/ctrl/mpexec/quantumGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ class QuantumExecutor(ABC):
"""

@abstractmethod
def execute(self, task_node: TaskNode | TaskDef, /, quantum: Quantum) -> Quantum:
def execute(
self, task_node: TaskNode | TaskDef, /, quantum: Quantum
) -> tuple[Quantum, QuantumReport | None]:
"""Execute single quantum.
Parameters
Expand All @@ -66,6 +68,10 @@ def execute(self, task_node: TaskNode | TaskDef, /, quantum: Quantum) -> Quantum
-------
quantum : `~lsst.daf.butler.Quantum`
The quantum actually executed.
report : `~lsst.ctrl.mpexec.QuantumReport`
Structure describing the status of the execution of a quantum.
`None` is returned if implementation does not support this
feature.
Notes
-----
Expand All @@ -74,23 +80,6 @@ def execute(self, task_node: TaskNode | TaskDef, /, quantum: Quantum) -> Quantum
"""
raise NotImplementedError()

def getReport(self) -> QuantumReport | None:
"""Return execution report from last call to `execute`.
Returns
-------
report : `~lsst.ctrl.mpexec.QuantumReport`
Structure describing the status of the execution of a quantum.
`None` is returned if implementation does not support this
feature.
Raises
------
RuntimeError
Raised if this method is called before `execute`.
"""
return None


class QuantumGraphExecutor(ABC):
"""Class which abstracts QuantumGraph execution.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,5 +414,5 @@ def as_generator(
# which might be useful for callers who want to check the state of the
# repo in between.
return (
single_quantum_executor.execute(qnode.task_node, qnode.quantum) for qnode in self.quantum_graph
single_quantum_executor.execute(qnode.task_node, qnode.quantum)[0] for qnode in self.quantum_graph
)
26 changes: 6 additions & 20 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ def __init__(
self.clobberOutputs = clobberOutputs
self.exitOnKnownError = exitOnKnownError
self.limited_butler_factory = limited_butler_factory
self.report: QuantumReport | None = None
self.resources = resources

if self.butler is None:
Expand All @@ -164,26 +163,19 @@ def __init__(
collectionTypes=CollectionType.RUN,
)

def execute(self, task_node: TaskDef | TaskNode, /, quantum: Quantum) -> Quantum:
def execute(
self, task_node: TaskNode | TaskDef, /, quantum: Quantum
) -> tuple[Quantum, QuantumReport | None]:
# Docstring inherited from QuantumExecutor.execute
assert quantum.dataId is not None, "Quantum DataId cannot be None"

task_node = self._conform_task_def(task_node)
if self.butler is not None:
self.butler.registry.refresh()

# Catch any exception and make a report based on that.
try:
result = self._execute(task_node, quantum)
self.report = QuantumReport(dataId=quantum.dataId, taskLabel=task_node.label)
return result
except Exception as exc:
self.report = QuantumReport.from_exception(
exception=exc,
dataId=quantum.dataId,
taskLabel=task_node.label,
)
raise
result = self._execute(task_node, quantum)
report = QuantumReport(dataId=quantum.dataId, taskLabel=task_node.label)
return result, report

def _conform_task_def(self, task_node: TaskDef | TaskNode) -> TaskNode:
"""Convert the given object to a TaskNode and emit a deprecation
Expand Down Expand Up @@ -589,9 +581,3 @@ def initGlobals(self, quantum: Quantum) -> None:
else:
oneInstrument = instrument
Instrument.fromName(instrument, self.butler.registry)

def getReport(self) -> QuantumReport | None:
# Docstring inherited from base class
if self.report is None:
raise RuntimeError("getReport() called before execute()")
return self.report
Loading

0 comments on commit b0c05c4

Please sign in to comment.