Skip to content

Commit

Permalink
Merge pull request #286 from lsst/tickets/DM-43484
Browse files Browse the repository at this point in the history
DM-43484: Fix BPS auto-retries and add support for testing that in ci_middleware
  • Loading branch information
TallJimbo authored Mar 28, 2024
2 parents d2bfc69 + 8230571 commit 716b37d
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 11 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-43484.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix BPS auto-retry functionality broken on DM-43060, by restoring support for repeated execution of already-successful quanta in `pipetask run-qbb`.
40 changes: 30 additions & 10 deletions python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,18 @@
from __future__ import annotations

from collections.abc import Iterable, Mapping
from typing import TYPE_CHECKING

import click
from lsst.daf.butler.cli.utils import MWOptionDecorator, MWPath, split_commas, unwrap
from lsst.utils.doImport import doImportType

if TYPE_CHECKING:
# Avoid regular module-scope import of test-only code that tinkers with the
# storage class singleton.
from lsst.pipe.base.tests.mocks import ForcedFailure


butler_config_option = MWOptionDecorator(
"-b", "--butler-config", help="Location of the gen3 butler/registry config file."
)
Expand Down Expand Up @@ -465,7 +472,7 @@

def parse_mock_failure(
ctx: click.Context, param: click.Option, value: Iterable[str] | None
) -> Mapping[str, tuple[str, type[Exception] | None]]:
) -> Mapping[str, ForcedFailure]:
"""Parse the --mock-failure option values into the mapping accepted by
`~lsst.pipe.base.tests.mocks.mock_task_defs`.
Expand All @@ -478,19 +485,27 @@ def parse_mock_failure(
value : `~collections.abc.Iterable` [`str`] or `None`
Value from option.
"""
result: dict[str, tuple[str, type[Exception] | None]] = {}
# Avoid regular module-scope import of test-only code that tinkers with the
# storage class singleton.
from lsst.pipe.base.tests.mocks import ForcedFailure

result: dict[str, ForcedFailure] = {}
if value is None:
return result
for entry in value:
try:
task_label, error_type_name, where = entry.split(":", 2)
task_label, error_type_name, where, *rest = entry.split(":")
if rest:
(memory_required,) = rest
else:
memory_required = None
except ValueError:
raise click.UsageError(
f"Invalid value for --mock-failure option: {entry!r}; "
"expected a string of the form 'task:error:where'."
"expected a string of the form 'task:error:where[:mem]'."
) from None
error_type = doImportType(error_type_name) if error_type_name else None
result[task_label] = (where, error_type)
result[task_label] = ForcedFailure(where, error_type, memory_required)
return result


Expand All @@ -502,11 +517,16 @@ def parse_mock_failure(
multiple=True,
help=unwrap(
"""Specifications for tasks that should be configured to fail
when mocking execution. This is a colon-separated 3-tuple, where the
first entry the task label, the second the fully-qualified exception
type (empty for ValueError, and the third a string (which typically
needs to be quoted to be passed as one argument value by the shell) of
the form passed to --where, indicating which data IDs should fail."""
when mocking execution. This is a colon-separated 3-tuple or 4-tuple,
where the first entry the task label, the second the fully-qualified
exception type (empty for ValueError, and the third a string (which
typically needs to be quoted to be passed as one argument value by the
shell) of the form passed to --where, indicating which data IDs should
fail. The final optional term is the memory "required" by the task
(with units recognized by astropy), which will cause the error to only
occur if the "available" memory (according to
ExecutionResources.max_mem) is less than this value. Note that actual
memory usage is irrelevant here; this is all mock behavior."""
),
)

Expand Down
2 changes: 2 additions & 0 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,8 @@ def runGraphQBB(self, task_factory: TaskFactory, args: SimpleNamespace) -> None:
exitOnKnownError=args.fail_fast,
limited_butler_factory=_butler_factory,
resources=resources,
clobberOutputs=True,
skipExisting=True,
)

timeout = self.MP_TIMEOUT if args.timeout is None else args.timeout
Expand Down
8 changes: 7 additions & 1 deletion python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ class SingleQuantumExecutor(QuantumExecutor):
`None`. If ``butler`` is not `None` then this parameter is ignored.
resources : `~lsst.pipe.base.ExecutionResources`, optional
The resources available to this quantum when executing.
skipExisting : `bool`, optional
If `True`, skip quanta whose metadata datasets are already stored.
Unlike ``skipExistingIn``, this works with limited butlers as well as
full butlers. Always set to `True` if ``skipExistingIn`` matches
``butler.run``.
"""

def __init__(
Expand All @@ -127,6 +132,7 @@ def __init__(
exitOnKnownError: bool = False,
limited_butler_factory: Callable[[Quantum], LimitedButler] | None = None,
resources: ExecutionResources | None = None,
skipExisting: bool = False,
):
self.butler = butler
self.taskFactory = taskFactory
Expand All @@ -143,7 +149,7 @@ def __init__(
# Find whether output run is in skipExistingIn.
# TODO: This duplicates logic in GraphBuilder, would be nice to have
# better abstraction for this some day.
self.skipExisting = False
self.skipExisting = skipExisting
if self.butler is not None and skipExistingIn:
skip_collections_wildcard = CollectionWildcard.from_expression(skipExistingIn)
# As optimization check in the explicit list of names first
Expand Down
4 changes: 4 additions & 0 deletions run_err.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- task0:
data_id:
detector: 0
instrument: INSTR

0 comments on commit 716b37d

Please sign in to comment.