Skip to content

Commit

Permalink
Special-case AnnotatedPartialOutputsError in mocking system.
Browse files Browse the repository at this point in the history
We need this to raise with chaining in the mocks, since that's how
it should always be used in production.
  • Loading branch information
TallJimbo committed Aug 23, 2024
1 parent 3ac9f94 commit 6c6b4b4
Showing 1 changed file with 23 additions and 4 deletions.
27 changes: 23 additions & 4 deletions python/lsst/pipe/base/tests/mocks/_pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@
from typing import TYPE_CHECKING, Any, ClassVar, TypeVar

from astropy.units import Quantity
from lsst.daf.butler import DataCoordinate, DatasetRef, DeferredDatasetHandle, SerializedDatasetType
from lsst.daf.butler import DataCoordinate, DatasetRef, DeferredDatasetHandle, Quantum, SerializedDatasetType
from lsst.pex.config import Config, ConfigDictField, ConfigurableField, Field, ListField
from lsst.utils.doImport import doImportType
from lsst.utils.introspection import get_full_type_name
from lsst.utils.iteration import ensure_iterable

from ... import connectionTypes as cT
from ..._status import AnnotatedPartialOutputsError, RepeatableQuantumError
from ...config import PipelineTaskConfig
from ...connections import InputQuantizedConnection, OutputQuantizedConnection, PipelineTaskConnections
from ...pipeline_graph import PipelineGraph
Expand Down Expand Up @@ -291,18 +292,18 @@ def runQuantum(
# Possibly raise an exception.
if self.data_id_match is not None and self.data_id_match.match(quantum.dataId):
assert self.fail_exception is not None, "Exception type must be defined"
message = f"Simulated failure: task={self.getName()} dataId={quantum.dataId}"

if self.memory_required is not None:
if butlerQC.resources.max_mem < self.memory_required:
_LOG.info(
"Simulating out-of-memory failure for task '%s' on quantum %s",
self.getName(),
quantum.dataId,
)
raise self.fail_exception(message)
self._fail(quantum)
else:
_LOG.info("Simulating failure of task '%s' on quantum %s", self.getName(), quantum.dataId)
raise self.fail_exception(message)
self._fail(quantum)

# Populate the bit of provenance we store in all outputs.
_LOG.info("Reading input data for task '%s' on quantum %s", self.getName(), quantum.dataId)
Expand Down Expand Up @@ -351,6 +352,24 @@ def runQuantum(

_LOG.info("Finished mocking task '%s' on quantum %s", self.getName(), quantum.dataId)

def _fail(self, quantum: Quantum) -> None:
"""Raise the configured exception.
Parameters
----------
quantum : `lsst.daf.butler.Quantum`
Quantum producing the error.
"""
message = f"Simulated failure: task={self.getName()} dataId={quantum.dataId}"
if self.fail_exception is AnnotatedPartialOutputsError:
# This exception is expected to always chain another.
try:
raise RepeatableQuantumError(message)
except RepeatableQuantumError as err:
raise AnnotatedPartialOutputsError() from err
else:
raise self.fail_exception(message)


class MockPipelineDefaultTargetConnections(PipelineTaskConnections, dimensions=()):
pass
Expand Down

0 comments on commit 6c6b4b4

Please sign in to comment.