diff --git a/doc/changes/DM-43484.bugfix.md b/doc/changes/DM-43484.bugfix.md new file mode 100644 index 00000000..00bc64ef --- /dev/null +++ b/doc/changes/DM-43484.bugfix.md @@ -0,0 +1 @@ +Fix BPS auto-retry functionality broken on DM-43060, by restoring support for repeated execution of already-successful quanta in `pipetask run-qbb`. diff --git a/python/lsst/ctrl/mpexec/cli/opt/options.py b/python/lsst/ctrl/mpexec/cli/opt/options.py index 51a1549a..abc098cf 100644 --- a/python/lsst/ctrl/mpexec/cli/opt/options.py +++ b/python/lsst/ctrl/mpexec/cli/opt/options.py @@ -28,11 +28,18 @@ from __future__ import annotations from collections.abc import Iterable, Mapping +from typing import TYPE_CHECKING import click from lsst.daf.butler.cli.utils import MWOptionDecorator, MWPath, split_commas, unwrap from lsst.utils.doImport import doImportType +if TYPE_CHECKING: + # Avoid regular module-scope import of test-only code that tinkers with the + # storage class singleton. + from lsst.pipe.base.tests.mocks import ForcedFailure + + butler_config_option = MWOptionDecorator( "-b", "--butler-config", help="Location of the gen3 butler/registry config file." ) @@ -465,7 +472,7 @@ def parse_mock_failure( ctx: click.Context, param: click.Option, value: Iterable[str] | None -) -> Mapping[str, tuple[str, type[Exception] | None]]: +) -> Mapping[str, ForcedFailure]: """Parse the --mock-failure option values into the mapping accepted by `~lsst.pipe.base.tests.mocks.mock_task_defs`. @@ -478,19 +485,27 @@ def parse_mock_failure( value : `~collections.abc.Iterable` [`str`] or `None` Value from option. """ - result: dict[str, tuple[str, type[Exception] | None]] = {} + # Avoid regular module-scope import of test-only code that tinkers with the + # storage class singleton. + from lsst.pipe.base.tests.mocks import ForcedFailure + + result: dict[str, ForcedFailure] = {} if value is None: return result for entry in value: try: - task_label, error_type_name, where = entry.split(":", 2) + task_label, error_type_name, where, *rest = entry.split(":") + if rest: + (memory_required,) = rest + else: + memory_required = None except ValueError: raise click.UsageError( f"Invalid value for --mock-failure option: {entry!r}; " - "expected a string of the form 'task:error:where'." + "expected a string of the form 'task:error:where[:mem]'." ) from None error_type = doImportType(error_type_name) if error_type_name else None - result[task_label] = (where, error_type) + result[task_label] = ForcedFailure(where, error_type, memory_required) return result @@ -502,11 +517,16 @@ def parse_mock_failure( multiple=True, help=unwrap( """Specifications for tasks that should be configured to fail - when mocking execution. This is a colon-separated 3-tuple, where the - first entry the task label, the second the fully-qualified exception - type (empty for ValueError, and the third a string (which typically - needs to be quoted to be passed as one argument value by the shell) of - the form passed to --where, indicating which data IDs should fail.""" + when mocking execution. This is a colon-separated 3-tuple or 4-tuple, + where the first entry the task label, the second the fully-qualified + exception type (empty for ValueError, and the third a string (which + typically needs to be quoted to be passed as one argument value by the + shell) of the form passed to --where, indicating which data IDs should + fail. The final optional term is the memory "required" by the task + (with units recognized by astropy), which will cause the error to only + occur if the "available" memory (according to + ExecutionResources.max_mem) is less than this value. Note that actual + memory usage is irrelevant here; this is all mock behavior.""" ), ) diff --git a/python/lsst/ctrl/mpexec/cmdLineFwk.py b/python/lsst/ctrl/mpexec/cmdLineFwk.py index c3e8fc40..6f873a3e 100644 --- a/python/lsst/ctrl/mpexec/cmdLineFwk.py +++ b/python/lsst/ctrl/mpexec/cmdLineFwk.py @@ -1000,6 +1000,8 @@ def runGraphQBB(self, task_factory: TaskFactory, args: SimpleNamespace) -> None: exitOnKnownError=args.fail_fast, limited_butler_factory=_butler_factory, resources=resources, + clobberOutputs=True, + skipExisting=True, ) timeout = self.MP_TIMEOUT if args.timeout is None else args.timeout diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index 7d5e0821..a943d7c2 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -115,6 +115,11 @@ class SingleQuantumExecutor(QuantumExecutor): `None`. If ``butler`` is not `None` then this parameter is ignored. resources : `~lsst.pipe.base.ExecutionResources`, optional The resources available to this quantum when executing. + skipExisting : `bool`, optional + If `True`, skip quanta whose metadata datasets are already stored. + Unlike ``skipExistingIn``, this works with limited butlers as well as + full butlers. Always set to `True` if ``skipExistingIn`` matches + ``butler.run``. """ def __init__( @@ -127,6 +132,7 @@ def __init__( exitOnKnownError: bool = False, limited_butler_factory: Callable[[Quantum], LimitedButler] | None = None, resources: ExecutionResources | None = None, + skipExisting: bool = False, ): self.butler = butler self.taskFactory = taskFactory @@ -143,7 +149,7 @@ def __init__( # Find whether output run is in skipExistingIn. # TODO: This duplicates logic in GraphBuilder, would be nice to have # better abstraction for this some day. - self.skipExisting = False + self.skipExisting = skipExisting if self.butler is not None and skipExistingIn: skip_collections_wildcard = CollectionWildcard.from_expression(skipExistingIn) # As optimization check in the explicit list of names first diff --git a/run_err.yaml b/run_err.yaml new file mode 100644 index 00000000..c79bb0df --- /dev/null +++ b/run_err.yaml @@ -0,0 +1,4 @@ +- task0: + data_id: + detector: 0 + instrument: INSTR