diff --git a/python/lsst/ctrl/mpexec/mpGraphExecutor.py b/python/lsst/ctrl/mpexec/mpGraphExecutor.py index bb9d8282..304fd9b2 100644 --- a/python/lsst/ctrl/mpexec/mpGraphExecutor.py +++ b/python/lsst/ctrl/mpexec/mpGraphExecutor.py @@ -167,15 +167,26 @@ def _executeJob( CliLog.replayConfigState(logConfigState) quantum = pickle.loads(quantum_pickle) + report: QuantumReport | None = None try: - quantumExecutor.execute(task_node, quantum) + _, report = quantumExecutor.execute(task_node, quantum) + except Exception as exc: + _LOG.debug("exception from task %s dataId %s: %s", task_node.label, quantum.dataId, exc) + report = QuantumReport.from_exception( + exception=exc, + dataId=quantum.dataId, + taskLabel=task_node.label, + ) + raise finally: - # If sending fails we do not want this new exception to be exposed. - try: - report = quantumExecutor.getReport() - snd_conn.send(report) - except Exception: - pass + if report is not None: + # If sending fails we do not want this new exception to be + # exposed. + try: + _LOG.debug("sending report for task %s dataId %s", task_node.label, quantum.dataId) + snd_conn.send(report) + except Exception: + pass def stop(self) -> None: """Stop the process.""" @@ -480,9 +491,18 @@ def _executeQuantaInProcess(self, graph: QuantumGraph, report: Report) -> None: _LOG.debug("Executing %s", qnode) try: - self.quantumExecutor.execute(task_node, qnode.quantum) + _, quantum_report = self.quantumExecutor.execute(task_node, qnode.quantum) + if quantum_report: + report.quantaReports.append(quantum_report) successCount += 1 except Exception as exc: + quantum_report = QuantumReport.from_exception( + exception=exc, + dataId=qnode.quantum.dataId, + taskLabel=task_node.label, + ) + report.quantaReports.append(quantum_report) + if self.pdb and sys.stdin.isatty() and sys.stdout.isatty(): _LOG.error( "Task <%s dataId=%s> failed; dropping into pdb.", @@ -522,10 +542,6 @@ def _executeQuantaInProcess(self, graph: QuantumGraph, report: Report) -> None: # times, run a collection loop here explicitly. gc.collect() - quantum_report = self.quantumExecutor.getReport() - if quantum_report: - report.quantaReports.append(quantum_report) - _LOG.info( "Executed %d quanta successfully, %d failed and %d remain out of total %d quanta.", successCount, diff --git a/python/lsst/ctrl/mpexec/quantumGraphExecutor.py b/python/lsst/ctrl/mpexec/quantumGraphExecutor.py index 859bde1c..cd2d2db1 100644 --- a/python/lsst/ctrl/mpexec/quantumGraphExecutor.py +++ b/python/lsst/ctrl/mpexec/quantumGraphExecutor.py @@ -50,7 +50,9 @@ class QuantumExecutor(ABC): """ @abstractmethod - def execute(self, task_node: TaskNode | TaskDef, /, quantum: Quantum) -> Quantum: + def execute( + self, task_node: TaskNode | TaskDef, /, quantum: Quantum + ) -> tuple[Quantum, QuantumReport | None]: """Execute single quantum. Parameters @@ -66,6 +68,10 @@ def execute(self, task_node: TaskNode | TaskDef, /, quantum: Quantum) -> Quantum ------- quantum : `~lsst.daf.butler.Quantum` The quantum actually executed. + report : `~lsst.ctrl.mpexec.QuantumReport` + Structure describing the status of the execution of a quantum. + `None` is returned if implementation does not support this + feature. Notes ----- @@ -74,23 +80,6 @@ def execute(self, task_node: TaskNode | TaskDef, /, quantum: Quantum) -> Quantum """ raise NotImplementedError() - def getReport(self) -> QuantumReport | None: - """Return execution report from last call to `execute`. - - Returns - ------- - report : `~lsst.ctrl.mpexec.QuantumReport` - Structure describing the status of the execution of a quantum. - `None` is returned if implementation does not support this - feature. - - Raises - ------ - RuntimeError - Raised if this method is called before `execute`. - """ - return None - class QuantumGraphExecutor(ABC): """Class which abstracts QuantumGraph execution. diff --git a/python/lsst/ctrl/mpexec/simple_pipeline_executor.py b/python/lsst/ctrl/mpexec/simple_pipeline_executor.py index 45fa3b11..ffbaf2b1 100644 --- a/python/lsst/ctrl/mpexec/simple_pipeline_executor.py +++ b/python/lsst/ctrl/mpexec/simple_pipeline_executor.py @@ -414,5 +414,5 @@ def as_generator( # which might be useful for callers who want to check the state of the # repo in between. return ( - single_quantum_executor.execute(qnode.task_node, qnode.quantum) for qnode in self.quantum_graph + single_quantum_executor.execute(qnode.task_node, qnode.quantum)[0] for qnode in self.quantum_graph ) diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index 0de4d55a..28abef30 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -143,7 +143,6 @@ def __init__( self.clobberOutputs = clobberOutputs self.exitOnKnownError = exitOnKnownError self.limited_butler_factory = limited_butler_factory - self.report: QuantumReport | None = None self.resources = resources if self.butler is None: @@ -164,7 +163,9 @@ def __init__( collectionTypes=CollectionType.RUN, ) - def execute(self, task_node: TaskDef | TaskNode, /, quantum: Quantum) -> Quantum: + def execute( + self, task_node: TaskNode | TaskDef, /, quantum: Quantum + ) -> tuple[Quantum, QuantumReport | None]: # Docstring inherited from QuantumExecutor.execute assert quantum.dataId is not None, "Quantum DataId cannot be None" @@ -172,18 +173,9 @@ def execute(self, task_node: TaskDef | TaskNode, /, quantum: Quantum) -> Quantum if self.butler is not None: self.butler.registry.refresh() - # Catch any exception and make a report based on that. - try: - result = self._execute(task_node, quantum) - self.report = QuantumReport(dataId=quantum.dataId, taskLabel=task_node.label) - return result - except Exception as exc: - self.report = QuantumReport.from_exception( - exception=exc, - dataId=quantum.dataId, - taskLabel=task_node.label, - ) - raise + result = self._execute(task_node, quantum) + report = QuantumReport(dataId=quantum.dataId, taskLabel=task_node.label) + return result, report def _conform_task_def(self, task_node: TaskDef | TaskNode) -> TaskNode: """Convert the given object to a TaskNode and emit a deprecation @@ -589,9 +581,3 @@ def initGlobals(self, quantum: Quantum) -> None: else: oneInstrument = instrument Instrument.fromName(instrument, self.butler.registry) - - def getReport(self) -> QuantumReport | None: - # Docstring inherited from base class - if self.report is None: - raise RuntimeError("getReport() called before execute()") - return self.report diff --git a/tests/test_executors.py b/tests/test_executors.py index 0c96cb40..f301286b 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -25,12 +25,12 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Simple unit test for cmdLineFwk module. -""" +from __future__ import annotations import faulthandler import logging import multiprocessing +import multiprocessing.context import os import signal import sys @@ -38,6 +38,7 @@ import unittest import warnings from multiprocessing import Manager +from typing import TYPE_CHECKING, Any, Literal import networkx as nx import psutil @@ -48,13 +49,21 @@ MPTimeoutError, QuantumExecutor, QuantumReport, + Report, SingleQuantumExecutor, ) from lsst.ctrl.mpexec.execFixupDataId import ExecFixupDataId from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir from lsst.pipe.base import NodeId, QgraphSummary, QgraphTaskSummary +from lsst.pipe.base.graph import BuildId from lsst.pipe.base.tests.simpleQGraph import AddTaskFactoryMock, makeSimpleQGraph +if TYPE_CHECKING: + from collections.abc import Iterator + from multiprocessing.managers import ListProxy + + from lsst.daf.butler.dimensions import DataIdValue + logging.basicConfig(level=logging.DEBUG) _LOG = logging.getLogger(__name__) @@ -71,39 +80,27 @@ class QuantumExecutorMock(QuantumExecutor): Whether the mock should use multiprocessing or not. """ - def __init__(self, mp=False): - self.quanta = [] + def __init__(self, mp: bool = False): + self.quanta: list[QuantumMock] | ListProxy = [] if mp: # in multiprocess mode use shared list manager = Manager() self.quanta = manager.list() - self.report = None - self._execute_called = False - def execute(self, task_node, /, quantum): + def execute( # type: ignore[override] + self, task_node: TaskNodeMock, /, quantum: QuantumMock # type: ignore[override] + ) -> tuple[QuantumMock, QuantumReport | None]: _LOG.debug("QuantumExecutorMock.execute: task_node=%s dataId=%s", task_node, quantum.dataId) self._execute_called = True if task_node.task_class: - try: - # only works for one of the TaskMock classes below - task_node.task_class().runQuantum() - self.report = QuantumReport(dataId=quantum.dataId, taskLabel=task_node.label) - except Exception as exc: - self.report = QuantumReport.from_exception( - exception=exc, - dataId=quantum.dataId, - taskLabel=task_node.label, - ) - raise + # only works for one of the TaskMock classes below + task_node.task_class().runQuantum() + assert quantum.dataId is not None + report = QuantumReport(dataId=quantum.dataId, taskLabel=task_node.label) self.quanta.append(quantum) - return quantum - - def getReport(self): - if not self._execute_called: - raise RuntimeError("getReport called before execute") - return self.report + return quantum, report - def getDataIds(self, field): + def getDataIds(self, field: str) -> list[DataIdValue]: """Return values for dataId field for each visited quanta. Parameters @@ -111,7 +108,7 @@ def getDataIds(self, field): field : `str` Field to select. """ - return [quantum.dataId[field] for quantum in self.quanta] + return [quantum.dataId[field] for quantum in self.quanta if quantum.dataId is not None] class QuantumMock: @@ -123,19 +120,21 @@ class QuantumMock: The Data ID of this quantum. """ - def __init__(self, dataId): + def __init__(self, dataId: dict[str, DataIdValue]): self.dataId = dataId - def __eq__(self, other): + def __eq__(self, other: object) -> bool: + if not isinstance(other, QuantumMock): + return NotImplemented return self.dataId == other.dataId - def __hash__(self): + def __hash__(self) -> int: # dict.__eq__ is order-insensitive return hash(tuple(sorted(kv for kv in self.dataId.items()))) -class QuantumIterDataMock: - """Simple class to mock QuantumIterData. +class QuantumNodeMock: + """Simple class to mock QuantumNode. Parameters ---------- @@ -147,13 +146,13 @@ class QuantumIterDataMock: The data ID of the mocked quantum. """ - def __init__(self, index, task_node, **dataId): + def __init__(self, index: int, task_node: TaskNodeMock, **dataId: DataIdValue): self.index = index self.taskDef = task_node self.task_node = task_node self.quantum = QuantumMock(dataId) - self.dependencies = set() - self.nodeId = NodeId(index, "DummyBuildString") + self.dependencies: set[int] = set() + self.nodeId = NodeId(index, BuildId("DummyBuildString")) class QuantumGraphMock: @@ -161,33 +160,34 @@ class QuantumGraphMock: Parameters ---------- - qdata : `~collections.abc.Iterable` of `QuantumIterDataMock` + qdata : `list` [`QuantumNodeMock`] The nodes of the graph. """ - def __init__(self, qdata): + def __init__(self, qdata: list[QuantumNodeMock]): self._graph = nx.DiGraph() previous = qdata[0] for node in qdata[1:]: self._graph.add_edge(previous, node) previous = node - def __iter__(self): + def __iter__(self) -> Iterator: yield from nx.topological_sort(self._graph) - def __len__(self): + def __len__(self) -> int: return len(self._graph) - def findTaskDefByLabel(self, label): + def findTaskDefByLabel(self, label: str) -> TaskNodeMock | None: for q in self: if q.task_node.label == label: return q.taskDef + return None - def getQuantaForTask(self, taskDef): + def getQuantaForTask(self, taskDef) -> set[QuantumMock]: nodes = self.getNodesForTask(taskDef) return {q.quantum for q in nodes} - def getNodesForTask(self, taskDef): + def getNodesForTask(self, taskDef: Any) -> set[QuantumNodeMock]: quanta = set() for q in self: if q.task_node.label == taskDef.label: @@ -195,13 +195,13 @@ def getNodesForTask(self, taskDef): return quanta @property - def graph(self): + def graph(self) -> nx.DiGraph: return self._graph - def findCycle(self): + def findCycle(self) -> list: return [] - def determineInputsToQuantumNode(self, node): + def determineInputsToQuantumNode(self, node: QuantumNodeMock) -> set[QuantumNodeMock]: result = set() for n in node.dependencies: for otherNode in self: @@ -209,7 +209,7 @@ def determineInputsToQuantumNode(self, node): result.add(otherNode) return result - def getSummary(self): + def getSummary(self) -> QgraphSummary: summary = QgraphSummary( graphID="1712445133.605479-3902002", cmdLine="mock_pipetask -a 1 -b 2 -c 3 4 5 6", @@ -239,7 +239,7 @@ class TaskMockMP: canMultiprocess = True - def runQuantum(self): + def runQuantum(self) -> None: _LOG.debug("TaskMockMP.runQuantum") pass @@ -249,7 +249,7 @@ class TaskMockFail: canMultiprocess = True - def runQuantum(self): + def runQuantum(self) -> None: _LOG.debug("TaskMockFail.runQuantum") raise ValueError("expected failure") @@ -259,7 +259,7 @@ class TaskMockCrash: canMultiprocess = True - def runQuantum(self): + def runQuantum(self) -> None: _LOG.debug("TaskMockCrash.runQuantum") # Disable fault handler to suppress long scary traceback. faulthandler.disable() @@ -271,7 +271,7 @@ class TaskMockLongSleep: canMultiprocess = True - def runQuantum(self): + def runQuantum(self) -> None: _LOG.debug("TaskMockLongSleep.runQuantum") time.sleep(100.0) @@ -295,18 +295,18 @@ class TaskNodeMock: Configuration for the task. """ - def __init__(self, label="task1", task_class=TaskMockMP, config=None): + def __init__(self, label: str = "task1", task_class: type = TaskMockMP, config: Any = None): self.label = label # taskClass to look like TaskDef, task_class to look like TaskNode. self.taskClass = task_class self.task_class = task_class self.config = config - def __str__(self): + def __str__(self) -> str: return f"TaskNodeMock({self.label}, {self.taskClass.__name__})" -def _count_status(report, status): +def _count_status(report: Report, status: ExecutionStatus) -> int: """Count number of quanta witha a given status.""" return len([qrep for qrep in report.quantaReports if qrep.status is status]) @@ -314,19 +314,20 @@ def _count_status(report, status): class MPGraphExecutorTestCase(unittest.TestCase): """A test case for MPGraphExecutor class.""" - def test_mpexec_nomp(self): + def test_mpexec_nomp(self) -> None: """Make simple graph and execute.""" task_node = TaskNodeMock() qgraph = QuantumGraphMock( - [QuantumIterDataMock(index=i, task_node=task_node, detector=i) for i in range(3)] + [QuantumNodeMock(index=i, task_node=task_node, detector=i) for i in range(3)] ) # run in single-process mode qexec = QuantumExecutorMock() mpexec = MPGraphExecutor(numProc=1, timeout=100, quantumExecutor=qexec) - mpexec.execute(qgraph) + mpexec.execute(qgraph) # type: ignore[arg-type] self.assertEqual(qexec.getDataIds("detector"), [0, 1, 2]) report = mpexec.getReport() + assert report is not None self.assertEqual(report.status, ExecutionStatus.SUCCESS) self.assertIsNone(report.exitCode) self.assertIsNone(report.exceptionInfo) @@ -336,14 +337,14 @@ def test_mpexec_nomp(self): self.assertTrue(all(qrep.exceptionInfo is None for qrep in report.quantaReports)) self.assertTrue(all(qrep.taskLabel == "task1" for qrep in report.quantaReports)) - def test_mpexec_mp(self): + def test_mpexec_mp(self) -> None: """Make simple graph and execute.""" task_node = TaskNodeMock() qgraph = QuantumGraphMock( - [QuantumIterDataMock(index=i, task_node=task_node, detector=i) for i in range(3)] + [QuantumNodeMock(index=i, task_node=task_node, detector=i) for i in range(3)] ) - methods = ["spawn"] + methods: list[Literal["spawn", "forkserver"]] = ["spawn"] if sys.platform == "linux": methods.append("forkserver") @@ -353,9 +354,10 @@ def test_mpexec_mp(self): # defined. qexec = QuantumExecutorMock(mp=True) mpexec = MPGraphExecutor(numProc=3, timeout=100, quantumExecutor=qexec, startMethod=method) - mpexec.execute(qgraph) + mpexec.execute(qgraph) # type: ignore[arg-type] self.assertCountEqual(qexec.getDataIds("detector"), [0, 1, 2]) report = mpexec.getReport() + assert report is not None self.assertEqual(report.status, ExecutionStatus.SUCCESS) self.assertIsNone(report.exitCode) self.assertIsNone(report.exceptionInfo) @@ -365,20 +367,20 @@ def test_mpexec_mp(self): self.assertTrue(all(qrep.exceptionInfo is None for qrep in report.quantaReports)) self.assertTrue(all(qrep.taskLabel == "task1" for qrep in report.quantaReports)) - def test_mpexec_nompsupport(self): + def test_mpexec_nompsupport(self) -> None: """Try to run MP for task that has no MP support which should fail.""" task_node = TaskNodeMock(task_class=TaskMockNoMP) qgraph = QuantumGraphMock( - [QuantumIterDataMock(index=i, task_node=task_node, detector=i) for i in range(3)] + [QuantumNodeMock(index=i, task_node=task_node, detector=i) for i in range(3)] ) # run in multi-process mode qexec = QuantumExecutorMock() mpexec = MPGraphExecutor(numProc=3, timeout=100, quantumExecutor=qexec) with self.assertRaisesRegex(MPGraphExecutorError, "Task 'task1' does not support multiprocessing"): - mpexec.execute(qgraph) + mpexec.execute(qgraph) # type: ignore[arg-type] - def test_mpexec_fixup(self): + def test_mpexec_fixup(self) -> None: """Make simple graph and execute, add dependencies by executing fixup code. """ @@ -386,28 +388,28 @@ def test_mpexec_fixup(self): for reverse in (False, True): qgraph = QuantumGraphMock( - [QuantumIterDataMock(index=i, task_node=task_node, detector=i) for i in range(3)] + [QuantumNodeMock(index=i, task_node=task_node, detector=i) for i in range(3)] ) qexec = QuantumExecutorMock() fixup = ExecFixupDataId("task1", "detector", reverse=reverse) mpexec = MPGraphExecutor(numProc=1, timeout=100, quantumExecutor=qexec, executionGraphFixup=fixup) - mpexec.execute(qgraph) + mpexec.execute(qgraph) # type: ignore[arg-type] expected = [0, 1, 2] if reverse: expected = list(reversed(expected)) self.assertEqual(qexec.getDataIds("detector"), expected) - def test_mpexec_timeout(self): + def test_mpexec_timeout(self) -> None: """Fail due to timeout.""" task_node = TaskNodeMock() task_nodeSleep = TaskNodeMock(task_class=TaskMockLongSleep) qgraph = QuantumGraphMock( [ - QuantumIterDataMock(index=0, task_node=task_node, detector=0), - QuantumIterDataMock(index=1, task_node=task_nodeSleep, detector=1), - QuantumIterDataMock(index=2, task_node=task_node, detector=2), + QuantumNodeMock(index=0, task_node=task_node, detector=0), + QuantumNodeMock(index=1, task_node=task_nodeSleep, detector=1), + QuantumNodeMock(index=2, task_node=task_node, detector=2), ] ) @@ -415,20 +417,21 @@ def test_mpexec_timeout(self): qexec = QuantumExecutorMock(mp=True) mpexec = MPGraphExecutor(numProc=3, timeout=1, quantumExecutor=qexec, failFast=True) with self.assertRaises(MPTimeoutError): - mpexec.execute(qgraph) + mpexec.execute(qgraph) # type: ignore[arg-type] report = mpexec.getReport() + assert report is not None and report.exceptionInfo is not None self.assertEqual(report.status, ExecutionStatus.TIMEOUT) self.assertEqual(report.exceptionInfo.className, "lsst.ctrl.mpexec.mpGraphExecutor.MPTimeoutError") self.assertGreater(len(report.quantaReports), 0) self.assertEqual(_count_status(report, ExecutionStatus.TIMEOUT), 1) - self.assertTrue(any(qrep.exitCode < 0 for qrep in report.quantaReports)) + self.assertTrue(any(qrep.exitCode is not None and qrep.exitCode < 0 for qrep in report.quantaReports)) self.assertTrue(all(qrep.exceptionInfo is None for qrep in report.quantaReports)) # with failFast=False exception happens after last task finishes qexec = QuantumExecutorMock(mp=True) mpexec = MPGraphExecutor(numProc=3, timeout=3, quantumExecutor=qexec, failFast=False) with self.assertRaises(MPTimeoutError): - mpexec.execute(qgraph) + mpexec.execute(qgraph) # type: ignore[arg-type] # We expect two tasks (0 and 2) to finish successfully and one task to # timeout. Unfortunately on busy CPU there is no guarantee that tasks # finish on time, so expect more timeouts and issue a warning. @@ -437,31 +440,33 @@ def test_mpexec_timeout(self): if detectorIds != {0, 2}: warnings.warn(f"Possibly timed out tasks, expected [0, 2], received {detectorIds}") report = mpexec.getReport() + assert report is not None and report.exceptionInfo is not None self.assertEqual(report.status, ExecutionStatus.TIMEOUT) self.assertEqual(report.exceptionInfo.className, "lsst.ctrl.mpexec.mpGraphExecutor.MPTimeoutError") self.assertGreater(len(report.quantaReports), 0) self.assertGreater(_count_status(report, ExecutionStatus.TIMEOUT), 0) - self.assertTrue(any(qrep.exitCode < 0 for qrep in report.quantaReports)) + self.assertTrue(any(qrep.exitCode is not None and qrep.exitCode < 0 for qrep in report.quantaReports)) self.assertTrue(all(qrep.exceptionInfo is None for qrep in report.quantaReports)) - def test_mpexec_failure(self): + def test_mpexec_failure(self) -> None: """Failure in one task should not stop other tasks.""" task_node = TaskNodeMock() task_node_fail = TaskNodeMock(task_class=TaskMockFail) qgraph = QuantumGraphMock( [ - QuantumIterDataMock(index=0, task_node=task_node, detector=0), - QuantumIterDataMock(index=1, task_node=task_node_fail, detector=1), - QuantumIterDataMock(index=2, task_node=task_node, detector=2), + QuantumNodeMock(index=0, task_node=task_node, detector=0), + QuantumNodeMock(index=1, task_node=task_node_fail, detector=1), + QuantumNodeMock(index=2, task_node=task_node, detector=2), ] ) qexec = QuantumExecutorMock(mp=True) mpexec = MPGraphExecutor(numProc=3, timeout=100, quantumExecutor=qexec) with self.assertRaisesRegex(MPGraphExecutorError, "One or more tasks failed"): - mpexec.execute(qgraph) + mpexec.execute(qgraph) # type: ignore[arg-type] self.assertCountEqual(qexec.getDataIds("detector"), [0, 2]) report = mpexec.getReport() + assert report is not None and report.exceptionInfo is not None self.assertEqual(report.status, ExecutionStatus.FAILURE) self.assertEqual( report.exceptionInfo.className, "lsst.ctrl.mpexec.mpGraphExecutor.MPGraphExecutorError" @@ -469,19 +474,19 @@ def test_mpexec_failure(self): self.assertGreater(len(report.quantaReports), 0) self.assertEqual(_count_status(report, ExecutionStatus.FAILURE), 1) self.assertEqual(_count_status(report, ExecutionStatus.SUCCESS), 2) - self.assertTrue(any(qrep.exitCode > 0 for qrep in report.quantaReports)) + self.assertTrue(any(qrep.exitCode is not None and qrep.exitCode > 0 for qrep in report.quantaReports)) self.assertTrue(any(qrep.exceptionInfo is not None for qrep in report.quantaReports)) - def test_mpexec_failure_dep(self): + def test_mpexec_failure_dep(self) -> None: """Failure in one task should skip dependents.""" task_node = TaskNodeMock() task_node_fail = TaskNodeMock(task_class=TaskMockFail) qdata = [ - QuantumIterDataMock(index=0, task_node=task_node, detector=0), - QuantumIterDataMock(index=1, task_node=task_node_fail, detector=1), - QuantumIterDataMock(index=2, task_node=task_node, detector=2), - QuantumIterDataMock(index=3, task_node=task_node, detector=3), - QuantumIterDataMock(index=4, task_node=task_node, detector=4), + QuantumNodeMock(index=0, task_node=task_node, detector=0), + QuantumNodeMock(index=1, task_node=task_node_fail, detector=1), + QuantumNodeMock(index=2, task_node=task_node, detector=2), + QuantumNodeMock(index=3, task_node=task_node, detector=3), + QuantumNodeMock(index=4, task_node=task_node, detector=4), ] qdata[2].dependencies.add(1) qdata[4].dependencies.add(3) @@ -492,9 +497,10 @@ def test_mpexec_failure_dep(self): qexec = QuantumExecutorMock(mp=True) mpexec = MPGraphExecutor(numProc=3, timeout=100, quantumExecutor=qexec) with self.assertRaisesRegex(MPGraphExecutorError, "One or more tasks failed"): - mpexec.execute(qgraph) + mpexec.execute(qgraph) # type: ignore[arg-type] self.assertCountEqual(qexec.getDataIds("detector"), [0, 3]) report = mpexec.getReport() + assert report is not None and report.exceptionInfo is not None self.assertEqual(report.status, ExecutionStatus.FAILURE) self.assertEqual( report.exceptionInfo.className, "lsst.ctrl.mpexec.mpGraphExecutor.MPGraphExecutorError" @@ -504,19 +510,19 @@ def test_mpexec_failure_dep(self): self.assertEqual(_count_status(report, ExecutionStatus.FAILURE), 1) self.assertEqual(_count_status(report, ExecutionStatus.SUCCESS), 2) self.assertEqual(_count_status(report, ExecutionStatus.SKIPPED), 2) - self.assertTrue(any(qrep.exitCode > 0 for qrep in report.quantaReports)) + self.assertTrue(any(qrep.exitCode is not None and qrep.exitCode > 0 for qrep in report.quantaReports)) self.assertTrue(any(qrep.exceptionInfo is not None for qrep in report.quantaReports)) - def test_mpexec_failure_dep_nomp(self): + def test_mpexec_failure_dep_nomp(self) -> None: """Failure in one task should skip dependents, in-process version.""" task_node = TaskNodeMock() task_node_fail = TaskNodeMock(task_class=TaskMockFail) qdata = [ - QuantumIterDataMock(index=0, task_node=task_node, detector=0), - QuantumIterDataMock(index=1, task_node=task_node_fail, detector=1), - QuantumIterDataMock(index=2, task_node=task_node, detector=2), - QuantumIterDataMock(index=3, task_node=task_node, detector=3), - QuantumIterDataMock(index=4, task_node=task_node, detector=4), + QuantumNodeMock(index=0, task_node=task_node, detector=0), + QuantumNodeMock(index=1, task_node=task_node_fail, detector=1), + QuantumNodeMock(index=2, task_node=task_node, detector=2), + QuantumNodeMock(index=3, task_node=task_node, detector=3), + QuantumNodeMock(index=4, task_node=task_node, detector=4), ] qdata[2].dependencies.add(1) qdata[4].dependencies.add(3) @@ -527,9 +533,10 @@ def test_mpexec_failure_dep_nomp(self): qexec = QuantumExecutorMock() mpexec = MPGraphExecutor(numProc=1, timeout=100, quantumExecutor=qexec) with self.assertRaisesRegex(MPGraphExecutorError, "One or more tasks failed"): - mpexec.execute(qgraph) + mpexec.execute(qgraph) # type: ignore[arg-type] self.assertCountEqual(qexec.getDataIds("detector"), [0, 3]) report = mpexec.getReport() + assert report is not None and report.exceptionInfo is not None self.assertEqual(report.status, ExecutionStatus.FAILURE) self.assertEqual( report.exceptionInfo.className, "lsst.ctrl.mpexec.mpGraphExecutor.MPGraphExecutorError" @@ -542,7 +549,7 @@ def test_mpexec_failure_dep_nomp(self): self.assertTrue(all(qrep.exitCode is None for qrep in report.quantaReports)) self.assertTrue(any(qrep.exceptionInfo is not None for qrep in report.quantaReports)) - def test_mpexec_failure_failfast(self): + def test_mpexec_failure_failfast(self) -> None: """Fast fail stops quickly. Timing delay of task #3 should be sufficient to process @@ -552,11 +559,11 @@ def test_mpexec_failure_failfast(self): task_node_fail = TaskNodeMock(task_class=TaskMockFail) task_nodeLongSleep = TaskNodeMock(task_class=TaskMockLongSleep) qdata = [ - QuantumIterDataMock(index=0, task_node=task_node, detector=0), - QuantumIterDataMock(index=1, task_node=task_node_fail, detector=1), - QuantumIterDataMock(index=2, task_node=task_node, detector=2), - QuantumIterDataMock(index=3, task_node=task_nodeLongSleep, detector=3), - QuantumIterDataMock(index=4, task_node=task_node, detector=4), + QuantumNodeMock(index=0, task_node=task_node, detector=0), + QuantumNodeMock(index=1, task_node=task_node_fail, detector=1), + QuantumNodeMock(index=2, task_node=task_node, detector=2), + QuantumNodeMock(index=3, task_node=task_nodeLongSleep, detector=3), + QuantumNodeMock(index=4, task_node=task_node, detector=4), ] qdata[1].dependencies.add(0) qdata[2].dependencies.add(1) @@ -568,9 +575,10 @@ def test_mpexec_failure_failfast(self): qexec = QuantumExecutorMock(mp=True) mpexec = MPGraphExecutor(numProc=3, timeout=100, quantumExecutor=qexec, failFast=True) with self.assertRaisesRegex(MPGraphExecutorError, "failed, exit code=1"): - mpexec.execute(qgraph) + mpexec.execute(qgraph) # type: ignore[arg-type] self.assertCountEqual(qexec.getDataIds("detector"), [0]) report = mpexec.getReport() + assert report is not None and report.exceptionInfo is not None self.assertEqual(report.status, ExecutionStatus.FAILURE) self.assertEqual( report.exceptionInfo.className, "lsst.ctrl.mpexec.mpGraphExecutor.MPGraphExecutorError" @@ -578,26 +586,27 @@ def test_mpexec_failure_failfast(self): # Dependencies of failed tasks do not appear in quantaReports self.assertGreater(len(report.quantaReports), 0) self.assertEqual(_count_status(report, ExecutionStatus.FAILURE), 1) - self.assertTrue(any(qrep.exitCode > 0 for qrep in report.quantaReports)) + self.assertTrue(any(qrep.exitCode is not None and qrep.exitCode > 0 for qrep in report.quantaReports)) self.assertTrue(any(qrep.exceptionInfo is not None for qrep in report.quantaReports)) - def test_mpexec_crash(self): + def test_mpexec_crash(self) -> None: """Check task crash due to signal.""" task_node = TaskNodeMock() task_node_crash = TaskNodeMock(task_class=TaskMockCrash) qgraph = QuantumGraphMock( [ - QuantumIterDataMock(index=0, task_node=task_node, detector=0), - QuantumIterDataMock(index=1, task_node=task_node_crash, detector=1), - QuantumIterDataMock(index=2, task_node=task_node, detector=2), + QuantumNodeMock(index=0, task_node=task_node, detector=0), + QuantumNodeMock(index=1, task_node=task_node_crash, detector=1), + QuantumNodeMock(index=2, task_node=task_node, detector=2), ] ) qexec = QuantumExecutorMock(mp=True) mpexec = MPGraphExecutor(numProc=3, timeout=100, quantumExecutor=qexec) with self.assertRaisesRegex(MPGraphExecutorError, "One or more tasks failed"): - mpexec.execute(qgraph) + mpexec.execute(qgraph) # type: ignore[arg-type] report = mpexec.getReport() + assert report is not None and report.exceptionInfo is not None self.assertEqual(report.status, ExecutionStatus.FAILURE) self.assertEqual( report.exceptionInfo.className, "lsst.ctrl.mpexec.mpGraphExecutor.MPGraphExecutorError" @@ -609,23 +618,24 @@ def test_mpexec_crash(self): self.assertTrue(any(qrep.exitCode == -signal.SIGILL for qrep in report.quantaReports)) self.assertTrue(all(qrep.exceptionInfo is None for qrep in report.quantaReports)) - def test_mpexec_crash_failfast(self): + def test_mpexec_crash_failfast(self) -> None: """Check task crash due to signal with --fail-fast.""" task_node = TaskNodeMock() task_node_crash = TaskNodeMock(task_class=TaskMockCrash) qgraph = QuantumGraphMock( [ - QuantumIterDataMock(index=0, task_node=task_node, detector=0), - QuantumIterDataMock(index=1, task_node=task_node_crash, detector=1), - QuantumIterDataMock(index=2, task_node=task_node, detector=2), + QuantumNodeMock(index=0, task_node=task_node, detector=0), + QuantumNodeMock(index=1, task_node=task_node_crash, detector=1), + QuantumNodeMock(index=2, task_node=task_node, detector=2), ] ) qexec = QuantumExecutorMock(mp=True) mpexec = MPGraphExecutor(numProc=3, timeout=100, quantumExecutor=qexec, failFast=True) with self.assertRaisesRegex(MPGraphExecutorError, "failed, killed by signal 4 .Illegal instruction"): - mpexec.execute(qgraph) + mpexec.execute(qgraph) # type: ignore[arg-type] report = mpexec.getReport() + assert report is not None and report.exceptionInfo is not None self.assertEqual(report.status, ExecutionStatus.FAILURE) self.assertEqual( report.exceptionInfo.className, "lsst.ctrl.mpexec.mpGraphExecutor.MPGraphExecutorError" @@ -634,11 +644,11 @@ def test_mpexec_crash_failfast(self): self.assertTrue(any(qrep.exitCode == -signal.SIGILL for qrep in report.quantaReports)) self.assertTrue(all(qrep.exceptionInfo is None for qrep in report.quantaReports)) - def test_mpexec_num_fd(self): + def test_mpexec_num_fd(self) -> None: """Check that number of open files stays reasonable.""" task_node = TaskNodeMock() qgraph = QuantumGraphMock( - [QuantumIterDataMock(index=i, task_node=task_node, detector=i) for i in range(20)] + [QuantumNodeMock(index=i, task_node=task_node, detector=i) for i in range(20)] ) this_proc = psutil.Process() @@ -647,7 +657,7 @@ def test_mpexec_num_fd(self): # run in multi-process mode, the order of results is not defined qexec = QuantumExecutorMock(mp=True) mpexec = MPGraphExecutor(numProc=3, timeout=100, quantumExecutor=qexec) - mpexec.execute(qgraph) + mpexec.execute(qgraph) # type: ignore[arg-type] num_fds_1 = this_proc.num_fds() # They should be the same but allow small growth just in case. @@ -661,10 +671,10 @@ class SingleQuantumExecutorTestCase(unittest.TestCase): instrument = "lsst.pipe.base.tests.simpleQGraph.SimpleInstrument" - def setUp(self): + def setUp(self) -> None: self.root = makeTestTempDir(TESTDIR) - def tearDown(self): + def tearDown(self) -> None: removeTestTempDir(self.root) def test_simple_execute(self) -> None: