From 9a81d7cdbca7df244b39226c3cadf0c621ae41e7 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Sat, 6 May 2023 09:42:20 -0400 Subject: [PATCH 01/10] Add new package that provides mocks for storage classes. --- python/lsst/pipe/base/tests/mocks/__init__.py | 22 + .../pipe/base/tests/mocks/_storage_class.py | 384 ++++++++++++++++++ 2 files changed, 406 insertions(+) create mode 100644 python/lsst/pipe/base/tests/mocks/__init__.py create mode 100644 python/lsst/pipe/base/tests/mocks/_storage_class.py diff --git a/python/lsst/pipe/base/tests/mocks/__init__.py b/python/lsst/pipe/base/tests/mocks/__init__.py new file mode 100644 index 00000000..c4a5ace0 --- /dev/null +++ b/python/lsst/pipe/base/tests/mocks/__init__.py @@ -0,0 +1,22 @@ +# This file is part of pipe_base. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from ._storage_class import * diff --git a/python/lsst/pipe/base/tests/mocks/_storage_class.py b/python/lsst/pipe/base/tests/mocks/_storage_class.py new file mode 100644 index 00000000..f89d2f6d --- /dev/null +++ b/python/lsst/pipe/base/tests/mocks/_storage_class.py @@ -0,0 +1,384 @@ +# This file is part of pipe_base. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ( + "MockDataset", + "MockStorageClass", + "MockDatasetQuantum", + "MockStorageClassDelegate", + "get_mock_name", + "get_original_name", + "is_mock_name", +) + +from collections.abc import Callable, Iterable, Mapping +from typing import Any, cast + +import pydantic +from lsst.daf.butler import ( + Config, + DatasetComponent, + SerializedDataCoordinate, + SerializedDatasetRef, + SerializedDatasetType, + StorageClass, + StorageClassDelegate, + StorageClassFactory, +) +from lsst.resources import ResourcePath, ResourcePathExpression +from lsst.utils.introspection import get_full_type_name + + +_NAME_PREFIX: str = "_mock_" + + +def get_mock_name(original: str) -> str: + """Return the name of the mock storage class, dataset type, or task label + for the given original name. + """ + return _NAME_PREFIX + original + + +def get_original_name(mock: str) -> str: + """Return the name of the original storage class, dataset type, or task + label that corresponds to the given mock name. + """ + assert mock.startswith(_NAME_PREFIX) + return mock[len(_NAME_PREFIX) :] + + +def is_mock_name(name: str) -> bool: + """Return whether the given name is that of a mock storage class, dataset + type, or task label. + """ + return name.startswith(_NAME_PREFIX) + + +# Tests for this module are in the ci_middleware package, where we have easy +# access to complex real storage classes (and their pytypes) to test against. + + +class MockDataset(pydantic.BaseModel): + """The in-memory dataset type used by `MockStorageClass`.""" + + ref: SerializedDatasetRef + """Reference used to read and write this dataset. + + This is a `SerializedDatasetRef` instead of a "real" one for two reasons: + + - the mock dataset may need to be read from disk in a context in which a + `~lsst.daf.butler.DimensionUniverse` is unavailable; + - we don't want the complexity of having a separate + ``SerializedMockDataset``. + + The downside of this is that we end up effectively reimplementing a few + fairly trivial DatasetType/DatasetRef methods that override storage classes + and extract components (in `MockStorageClass` and + `MockStorageClassDelegate`). + """ + + quantum: MockDatasetQuantum | None = None + """Description of the quantum that produced this dataset. + """ + + output_connection_name: str | None = None + """The name of the PipelineTask output connection that produced this + dataset. + """ + + converted_from: MockDataset | None = None + """Another `MockDataset` that underwent a storage class conversion to + produce this one. + """ + + parent: MockDataset | None = None + """Another `MockDataset` from which a component was extract to form this + one. + """ + + parameters: dict[str, str] | None = None + """`repr` of all parameters applied when reading this dataset.""" + + @property + def dataset_type(self) -> SerializedDatasetType: + return cast(SerializedDatasetType, self.ref.datasetType) + + @property + def storage_class(self) -> str: + return cast(str, self.dataset_type.storageClass) + + def make_derived(self, **kwargs: Any) -> MockDataset: + """Return a new MockDataset that represents applying some storage class + operation to this one. + + Keyword arguments are fields of `MockDataset` or + `SerializedDatasetType` to override in the result. + """ + dataset_type_updates = { + k: kwargs.pop(k) for k in list(kwargs) if k in SerializedDatasetType.__fields__ + } + derived_dataset_type = self.dataset_type.copy(update=dataset_type_updates) + derived_ref = self.ref.copy(update=dict(datasetType=derived_dataset_type)) + # Fields below are those that should not be propagated to the derived + # dataset, because they're not about the intrinsic on-disk thing. + kwargs.setdefault("converted_from", None) + kwargs.setdefault("parent", None) + kwargs.setdefault("parameters", None) + # Also use setdefault on the ref in case caller wants to override that + # directly, but this is expected to be rare enough that it's not worth + # it to try to optimize out the work above to make derived_ref. + kwargs.setdefault("ref", derived_ref) + return self.copy(update=kwargs) + + +class MockDatasetQuantum(pydantic.BaseModel): + """Description of the quantum that produced a mock dataset.""" + + task_label: str + """Label of the producing PipelineTask in its pipeline.""" + + data_id: SerializedDataCoordinate + """Data ID for the quantum.""" + + inputs: dict[str, list[MockDataset]] + """Mock datasets provided as input to the quantum.""" + + +MockDataset.update_forward_refs() + + +class MockStorageClassDelegate(StorageClassDelegate): + """Implementation of the StorageClassDelegate interface for mock datasets. + + This class does not implement assembly and disassembly just because it's + not needed right now. That could be added in the future with some + additional tracking attributes in `MockDataset`. + """ + + def assemble(self, components: dict[str, Any], pytype: type | None = None) -> MockDataset: + # Docstring inherited. + raise NotImplementedError("Mock storage classes do not implement assembly.") + + def getComponent(self, composite: Any, componentName: str) -> Any: + # Docstring inherited. + assert isinstance( + composite, MockDataset + ), f"MockStorageClassDelegate given a non-mock dataset {composite!r}." + return composite.make_derived( + name=f"{composite.dataset_type.name}.{componentName}", + storageClass=self.storageClass.allComponents()[componentName].name, + parentStorageClass=self.storageClass.name, + parent=composite, + ) + + def disassemble( + self, composite: Any, subset: Iterable | None = None, override: Any | None = None + ) -> dict[str, DatasetComponent]: + # Docstring inherited. + raise NotImplementedError("Mock storage classes do not implement disassembly.") + + def handleParameters(self, inMemoryDataset: Any, parameters: Mapping[str, Any] | None = None) -> Any: + # Docstring inherited. + assert isinstance( + inMemoryDataset, MockDataset + ), f"MockStorageClassDelegate given a non-mock dataset {inMemoryDataset!r}." + if not parameters: + return inMemoryDataset + return inMemoryDataset.make_derived(parameters={k: repr(v) for k, v in parameters.items()}) + + +class MockStorageClass(StorageClass): + """A reimplementation of `lsst.daf.butler.StorageClass` for mock datasets. + + Each `MockStorageClass` instance corresponds to a real "original" storage + class, with components and conversions that are mocks of the original's + components and conversions. The `pytype` for all `MockStorageClass` + instances is `MockDataset`. + """ + + def __init__(self, original: StorageClass, factory: StorageClassFactory | None = None): + name = get_mock_name(original.name) + if factory is None: + factory = StorageClassFactory() + super().__init__( + name=name, + pytype=MockDataset, + components={ + k: self.get_or_register_mock(v.name, factory) for k, v in original.components.items() + }, + derivedComponents={ + k: self.get_or_register_mock(v.name, factory) for k, v in original.derivedComponents.items() + }, + parameters=frozenset(original.parameters), + delegate=get_full_type_name(MockStorageClassDelegate), + # Conversions work differently for mock storage classes, since they + # all have the same pytype: we use the original storage class being + # mocked to see if we can convert, then just make a new MockDataset + # that points back to the original. + converters={}, + ) + self.original = original + # Make certain no one tries to use the converters. + self._converters = None # type: ignore + + def _get_converters_by_type(self) -> dict[type, Callable[[Any], Any]]: + # Docstring inherited. + raise NotImplementedError("MockStorageClass does not use converters.") + + @classmethod + def get_or_register_mock( + cls, original: str, factory: StorageClassFactory | None = None + ) -> MockStorageClass: + """Return a mock storage class for the given original storage class, + creating and registering it if necessary. + + Parameters + ---------- + original : `str` + Name of the original storage class to be mocked. + factory : `StorageClassFactory`, optional + Storage class factory singleton instance. + + Returns + ------- + mock : `MockStorageClass` + New storage class that mocks ``original``. + """ + name = get_mock_name(original) + if factory is None: + factory = StorageClassFactory() + if name in factory: + return cast(MockStorageClass, factory.getStorageClass(name)) + else: + result = cls(factory.getStorageClass(original), factory) + factory.registerStorageClass(result) + return result + + @staticmethod + def make_formatter_config_dir( + root: ResourcePathExpression, factory: StorageClassFactory | None = None + ) -> None: + """Make a directory suitable for inclusion in the butler config + search path. + + Parameters + ---------- + root : convertible to `lsst.resources.ResourcePath` + Root directory that will be passed as any element of the + ``searchPaths`` list at `~lsst.daf.butler.Butler` construction. + factory : `StorageClassFactory`, optional + Storage class factory singleton instance. + + Notes + ----- + This adds formatter entries to `~lsst.daf.butler.FileDatastore` + configuration for all mock storage classes that have been registered so + far. It does not add the storage class configuration entries + themselves, because `MockStorageClass` can't be written out as + configuration in the same way that regular storage classes can. So the + usual pattern for creating a butler client with storage class mocking + is: + + - Register all needed mock storage classes with the singleton + `lsst.daf.butler.StorageClassFactory`, which is constructed on first + use even when there are no active butler clients. + + - Call this method to create a directory with formatter configuration + for those storage classes (within the same process, so singleton + state is preserved). + + - Create a butler client while passing ``searchPaths=[root]``, again + within the same process. + + There is currently no automatic way to share mock storage class + definitions across processes, other than to re-run the code that + registers those mock storage classes. + + Note that the data repository may be created any time before the butler + client is - the formatter configuration written by this method is + expected to be used per-client, not at data repository construction, + and nothing about the mocks is persistent within the data repository + other than dataset types that use those storage classes (and dataset + types with unrecognized storage classes are already carefully handled + by the butler). + """ + if factory is None: + factory = StorageClassFactory() + formatters_config = Config() + for storage_class_name in factory.keys(): + if is_mock_name(storage_class_name): + # Have to escape mock storage class names, because + # config["_mock_X"] gets reinterpreted as a nested config key, + # i.e. config["mock"]["X"]. + formatters_config["\\" + storage_class_name] = "lsst.daf.butler.formatters.json.JsonFormatter" + root = ResourcePath(root, forceDirectory=True) + dir_path = root.join("datastores", forceDirectory=True) + dir_path.mkdir() + formatters_config.dumpToUri(dir_path.join("formatters.yaml")) + with dir_path.join("fileDatastore.yaml").open("w") as stream: + stream.write( + "datastore:\n" + " cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore\n" + " formatters: !include formatters.yaml\n" + ) + with root.join("datastore.yaml").open("w") as stream: + stream.write("datastore:\n cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore\n") + + def allComponents(self) -> Mapping[str, MockStorageClass]: + # Docstring inherited. + return cast(Mapping[str, MockStorageClass], super().allComponents()) + + @property + def components(self) -> Mapping[str, MockStorageClass]: + # Docstring inherited. + return cast(Mapping[str, MockStorageClass], super().components) + + @property + def derivedComponents(self) -> Mapping[str, MockStorageClass]: + # Docstring inherited. + return cast(Mapping[str, MockStorageClass], super().derivedComponents) + + def can_convert(self, other: StorageClass) -> bool: + # Docstring inherited. + if not isinstance(other, MockStorageClass): + return False + return self.original.can_convert(other.original) + + def coerce_type(self, incorrect: Any) -> Any: + # Docstring inherited. + if not isinstance(incorrect, MockDataset): + raise TypeError( + f"Mock storage class {self.name!r} can only convert in-memory datasets " + f"corresponding to other mock storage classes, not {incorrect!r}." + ) + factory = StorageClassFactory() + other_storage_class = factory.getStorageClass(incorrect.storage_class) + assert isinstance(other_storage_class, MockStorageClass), "Should not get a MockDataset otherwise." + if other_storage_class.name == self.name: + return incorrect + if not self.can_convert(other_storage_class): + raise TypeError( + f"Mocked storage class {self.original.name!r} cannot convert from " + f"{other_storage_class.original.name!r}." + ) + return incorrect.make_derived(storageClass=self.name, converted_from=incorrect) From 6576d5678aa3402ce36e88331937b9c5e6616bcb Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Thu, 4 May 2023 10:02:14 -0400 Subject: [PATCH 02/10] Move PipelineTask mocking code from ctrl_mpexec. This has bitrotted and does not work anymore, but before fixing that I wanted to get a clean transfer of what we had before with only import/formatting changes. I haven't moved the mock for ButlerQuantumContext that these classes assume, because I'll be modifying them to use the MockStorageClass and MockDataset classes instead. --- mypy.ini | 4 + pyproject.toml | 1 + .../pipe/base/tests/mocks/_data_id_match.py | 170 ++++++++++++++++++ .../pipe/base/tests/mocks/_pipeline_task.py | 143 +++++++++++++++ requirements.txt | 1 + tests/test_dataid_match.py | 151 ++++++++++++++++ 6 files changed, 470 insertions(+) create mode 100644 python/lsst/pipe/base/tests/mocks/_data_id_match.py create mode 100644 python/lsst/pipe/base/tests/mocks/_pipeline_task.py create mode 100644 tests/test_dataid_match.py diff --git a/mypy.ini b/mypy.ini index 26dc779c..79155bbd 100644 --- a/mypy.ini +++ b/mypy.ini @@ -7,6 +7,10 @@ plugins = pydantic.mypy [mypy-networkx.*] ignore_missing_imports = True +# astropy doesn't ship type annotations +[mypy-astropy.*] +ignore_missing_imports = True + # Don't check LSST packages generally or even try to import them, since most # don't have type annotations. diff --git a/pyproject.toml b/pyproject.toml index 13ae7801..d97f9997 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "lsst-utils", "lsst-daf-butler", "lsst-pex-config", + "astropy", "pydantic", "networkx", "pyyaml >= 5.1", diff --git a/python/lsst/pipe/base/tests/mocks/_data_id_match.py b/python/lsst/pipe/base/tests/mocks/_data_id_match.py new file mode 100644 index 00000000..7021ac5e --- /dev/null +++ b/python/lsst/pipe/base/tests/mocks/_data_id_match.py @@ -0,0 +1,170 @@ +# This file is part of pipe_base. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +__all__ = ["DataIdMatch"] + +import operator +from typing import Any, Callable, List, Optional, Tuple + +import astropy.time +from lsst.daf.butler import DataId +from lsst.daf.butler.registry.queries.expressions.parser import Node, ParserYacc, TreeVisitor # type: ignore + + +class _DataIdMatchTreeVisitor(TreeVisitor): + """Expression tree visitor which evaluates expression using values from + DataId. + """ + + def __init__(self, dataId: DataId): + self.dataId = dataId + + def visitNumericLiteral(self, value: str, node: Node) -> Any: + # docstring is inherited from base class + try: + return int(value) + except ValueError: + return float(value) + + def visitStringLiteral(self, value: str, node: Node) -> Any: + # docstring is inherited from base class + return value + + def visitTimeLiteral(self, value: astropy.time.Time, node: Node) -> Any: + # docstring is inherited from base class + return value + + def visitRangeLiteral(self, start: int, stop: int, stride: Optional[int], node: Node) -> Any: + # docstring is inherited from base class + if stride is None: + return range(start, stop + 1) + else: + return range(start, stop + 1, stride) + + def visitIdentifier(self, name: str, node: Node) -> Any: + # docstring is inherited from base class + return self.dataId[name] + + def visitUnaryOp(self, operator_name: str, operand: Any, node: Node) -> Any: + # docstring is inherited from base class + operators: dict[str, Callable[[Any], Any]] = { + "NOT": operator.not_, + "+": operator.pos, + "-": operator.neg, + } + return operators[operator_name](operand) + + def visitBinaryOp(self, operator_name: str, lhs: Any, rhs: Any, node: Node) -> Any: + # docstring is inherited from base class + operators = { + "OR": operator.or_, + "AND": operator.and_, + "+": operator.add, + "-": operator.sub, + "*": operator.mul, + "/": operator.truediv, + "%": operator.mod, + "=": operator.eq, + "!=": operator.ne, + "<": operator.lt, + ">": operator.gt, + "<=": operator.le, + ">=": operator.ge, + } + return operators[operator_name](lhs, rhs) + + def visitIsIn(self, lhs: Any, values: List[Any], not_in: bool, node: Node) -> Any: + # docstring is inherited from base class + is_in = True + for value in values: + if not isinstance(value, range): + value = [value] + if lhs in value: + break + else: + is_in = False + if not_in: + is_in = not is_in + return is_in + + def visitParens(self, expression: Any, node: Node) -> Any: + # docstring is inherited from base class + return expression + + def visitTupleNode(self, items: Tuple[Any, ...], node: Node) -> Any: + # docstring is inherited from base class + raise NotImplementedError() + + def visitFunctionCall(self, name: str, args: List[Any], node: Node) -> Any: + # docstring is inherited from base class + raise NotImplementedError() + + def visitPointNode(self, ra: Any, dec: Any, node: Node) -> Any: + # docstring is inherited from base class + raise NotImplementedError() + + +class DataIdMatch: + """Class that can match DataId against the user-defined string expression. + + Parameters + ---------- + expression : `str` + User-defined expression, supports syntax defined by daf_butler + expression parser. Maps identifiers in the expression to the values of + DataId. + """ + + def __init__(self, expression: str): + parser = ParserYacc() + self.expression = expression + self.tree = parser.parse(expression) + + def match(self, dataId: DataId) -> bool: + """Matches DataId contents against the expression. + + Parameters + ---------- + dataId : `DataId` + DataId that is matched against an expression. + + Returns + ------- + match : `bool` + Result of expression evaluation. + + Raises + ------ + KeyError + Raised when identifier in expression is not defined for given + `DataId`. + TypeError + Raised when expression evaluates to a non-boolean type or when + operation in expression cannot be performed on operand types. + NotImplementedError + Raised when expression includes valid but unsupported syntax, e.g. + function call. + """ + visitor = _DataIdMatchTreeVisitor(dataId) + result = self.tree.visit(visitor) + if not isinstance(result, bool): + raise TypeError(f"Expression '{self.expression}' returned non-boolean object {type(result)}") + return result diff --git a/python/lsst/pipe/base/tests/mocks/_pipeline_task.py b/python/lsst/pipe/base/tests/mocks/_pipeline_task.py new file mode 100644 index 00000000..aa81920e --- /dev/null +++ b/python/lsst/pipe/base/tests/mocks/_pipeline_task.py @@ -0,0 +1,143 @@ +# This file is part of pipe_base. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from __future__ import annotations + +__all__ = ("MockPipelineTask", "MockPipelineTaskConfig") + +import logging +from typing import TYPE_CHECKING, Any + +from lsst.pex.config import Field +from lsst.utils.doImport import doImportType + +from ...pipelineTask import PipelineTask +from ...config import PipelineTaskConfig +from ...connections import PipelineTaskConnections, InputQuantizedConnection, OutputQuantizedConnection +from ._data_id_match import DataIdMatch + +_LOG = logging.getLogger(__name__) + +if TYPE_CHECKING: + from ...butlerQuantumContext import ButlerQuantumContext + + +class MockPipelineTaskConfig(PipelineTaskConfig, pipelineConnections=PipelineTaskConnections): + fail_condition: Field[str] = Field( + dtype=str, + default="", + doc=( + "Condition on DataId to raise an exception. String expression which includes attributes of " + "quantum DataId using a syntax of daf_butler user expressions (e.g. 'visit = 123')." + ), + ) + + fail_exception: Field[str] = Field( + dtype=str, + default="builtins.ValueError", + doc=( + "Class name of the exception to raise when fail condition is triggered. Can be " + "'lsst.pipe.base.NoWorkFound' to specify non-failure exception." + ), + ) + + def data_id_match(self) -> DataIdMatch | None: + if not self.fail_condition: + return None + return DataIdMatch(self.fail_condition) + + +class MockPipelineTask(PipelineTask): + """Implementation of PipelineTask used for running a mock pipeline. + + Notes + ----- + This class overrides `runQuantum` to read all input datasetRefs and to + store simple dictionary as output data. Output dictionary contains some + provenance data about inputs, the task that produced it, and corresponding + quantum. This class depends on `MockButlerQuantumContext` which knows how + to store the output dictionary data with special dataset types. + """ + + ConfigClass = MockPipelineTaskConfig + + def __init__(self, *, config: MockPipelineTaskConfig | None = None, **kwargs: Any): + super().__init__(config=config, **kwargs) + self.fail_exception: type | None = None + self.data_id_match: DataIdMatch | None = None + if config is not None: + self.data_id_match = config.data_id_match() + if self.data_id_match: + self.fail_exception = doImportType(config.fail_exception) + + def runQuantum( + self, + butlerQC: ButlerQuantumContext, + inputRefs: InputQuantizedConnection, + outputRefs: OutputQuantizedConnection, + ) -> None: + # docstring is inherited from the base class + quantum = butlerQC.quantum + + _LOG.info("Mocking execution of task '%s' on quantum %s", self.getName(), quantum.dataId) + + assert quantum.dataId is not None, "Quantum DataId cannot be None" + + # Possibly raise an exception. + if self.data_id_match is not None and self.data_id_match.match(quantum.dataId): + _LOG.info("Simulating failure of task '%s' on quantum %s", self.getName(), quantum.dataId) + message = f"Simulated failure: task={self.getName()} dataId={quantum.dataId}" + assert self.fail_exception is not None, "Exception type must be defined" + raise self.fail_exception(message) + + # read all inputs + inputs = butlerQC.get(inputRefs) + + _LOG.info("Read input data for task '%s' on quantum %s", self.getName(), quantum.dataId) + + # To avoid very deep provenance we trim inputs to a single level + for name, data in inputs.items(): + if isinstance(data, dict): + data = [data] + if isinstance(data, list): + for item in data: + qdata = item.get("quantum", {}) + qdata.pop("inputs", None) + + # store mock outputs + for name, refs in outputRefs: + if not isinstance(refs, list): + refs = [refs] + for ref in refs: + data = { + "ref": { + "dataId": {key.name: ref.dataId[key] for key in ref.dataId.keys()}, + "datasetType": ref.datasetType.name, + }, + "quantum": { + "task": self.getName(), + "dataId": {key.name: quantum.dataId[key] for key in quantum.dataId.keys()}, + "inputs": inputs, + }, + "outputName": name, + } + butlerQC.put(data, ref) + + _LOG.info("Finished mocking task '%s' on quantum %s", self.getName(), quantum.dataId) diff --git a/requirements.txt b/requirements.txt index c4fb569b..882f2164 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ pydantic numpy >= 1.17 networkx frozendict +astropy git+https://github.com/lsst/daf_butler@main#egg=lsst-daf-butler git+https://github.com/lsst/utils@main#egg=lsst-utils git+https://github.com/lsst/resources@main#egg=lsst-resources diff --git a/tests/test_dataid_match.py b/tests/test_dataid_match.py new file mode 100644 index 00000000..ff1a38c3 --- /dev/null +++ b/tests/test_dataid_match.py @@ -0,0 +1,151 @@ +# This file is part of ctrl_mpexec. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest + +from lsst.pipe.base.tests.mocks import DataIdMatch + + +class DataIdMatchTestCase(unittest.TestCase): + """A test case for DataidMatch class""" + + dataIds = ( + {"instrument": "INSTR", "detector": 1, "number": 4}, + {"instrument": "INSTR", "detector": 2, "number": 3}, + {"instrument": "LSST", "detector": 3, "number": 2}, + {"instrument": "LSST", "detector": 4, "number": 1}, + ) + + def test_strings(self): + """Tests for string comparisons method""" + + tests = ( + ("instrument = 'INSTR'", [True, True, False, False]), + ("instrument = 'LSST'", [False, False, True, True]), + ("instrument < 'LSST'", [True, True, False, False]), + ("instrument IN ('LSST', 'INSTR')", [True, True, True, True]), + ) + + for expr, result in tests: + dataIdMatch = DataIdMatch(expr) + self.assertEqual([dataIdMatch.match(dataId) for dataId in self.dataIds], result) + + def test_comparisons(self): + """Test all supported comparison operators""" + + tests = ( + ("detector = 1", [True, False, False, False]), + ("detector != 1", [False, True, True, True]), + ("detector > 2", [False, False, True, True]), + ("2 <= detector", [False, True, True, True]), + ("2 > detector", [True, False, False, False]), + ("2 >= detector", [True, True, False, False]), + ) + + for expr, result in tests: + dataIdMatch = DataIdMatch(expr) + self.assertEqual([dataIdMatch.match(dataId) for dataId in self.dataIds], result) + + def test_arith(self): + """Test all supported arithmetical operators""" + + tests = ( + ("detector + number = 5", [True, True, True, True]), + ("detector - number = 1", [False, False, True, False]), + ("detector * number = 6", [False, True, True, False]), + ("detector / number = 1.5", [False, False, True, False]), + ("detector % number = 1", [True, False, True, False]), + ("+detector = 1", [True, False, False, False]), + ("-detector = -4", [False, False, False, True]), + ) + + for expr, result in tests: + dataIdMatch = DataIdMatch(expr) + self.assertEqual([dataIdMatch.match(dataId) for dataId in self.dataIds], result) + + def test_logical(self): + """Test all supported logical operators""" + + tests = ( + ("detector = 1 OR instrument = 'LSST'", [True, False, True, True]), + ("detector = 1 AND instrument = 'INSTR'", [True, False, False, False]), + ("NOT detector = 1", [False, True, True, True]), + ) + + for expr, result in tests: + dataIdMatch = DataIdMatch(expr) + self.assertEqual([dataIdMatch.match(dataId) for dataId in self.dataIds], result) + + def test_parens(self): + """Test parentheses""" + + tests = (("(detector = 1 OR number = 1) AND instrument = 'LSST'", [False, False, False, True]),) + + for expr, result in tests: + dataIdMatch = DataIdMatch(expr) + self.assertEqual([dataIdMatch.match(dataId) for dataId in self.dataIds], result) + + def test_in(self): + """Test IN expression""" + + tests = ( + ("detector in (1, 3, 2)", [True, True, True, False]), + ("detector not in (1, 3, 2)", [False, False, False, True]), + ("detector in (1..4:2)", [True, False, True, False]), + ("detector in (1..4:2, 4)", [True, False, True, True]), + ) + + for expr, result in tests: + dataIdMatch = DataIdMatch(expr) + self.assertEqual([dataIdMatch.match(dataId) for dataId in self.dataIds], result) + + def test_errors(self): + """Test for errors in expressions""" + + dataId = {"instrument": "INSTR", "detector": 1} + + # Unknown identifier + expr = "INSTRUMENT = 'INSTR'" + dataIdMatch = DataIdMatch(expr) + with self.assertRaisesRegex(KeyError, "INSTRUMENT"): + dataIdMatch.match(dataId) + + # non-boolean expression + expr = "instrument" + dataIdMatch = DataIdMatch(expr) + with self.assertRaisesRegex(TypeError, "Expression 'instrument' returned non-boolean object"): + dataIdMatch.match(dataId) + + # operations on unsupported combination of types + expr = "instrument - detector = 0" + dataIdMatch = DataIdMatch(expr) + with self.assertRaisesRegex(TypeError, "unsupported operand type"): + dataIdMatch.match(dataId) + + # function calls are not implemented + expr = "POINT(2, 1) != POINT(1, 2)" + dataIdMatch = DataIdMatch(expr) + with self.assertRaises(NotImplementedError): + dataIdMatch.match(dataId) + + +if __name__ == "__main__": + unittest.main() From 75e0b84d8446bebfc97e5134d8514fbe94ba73c9 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Sat, 6 May 2023 10:55:51 -0400 Subject: [PATCH 03/10] Work around MyPy narrowing limitations. This seems to have been introduced by switching derivedComponents to Mapping instead of dict, but that shouldn't have mattered. --- python/lsst/pipe/base/_dataset_handle.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/lsst/pipe/base/_dataset_handle.py b/python/lsst/pipe/base/_dataset_handle.py index 40f9c984..34063dce 100644 --- a/python/lsst/pipe/base/_dataset_handle.py +++ b/python/lsst/pipe/base/_dataset_handle.py @@ -23,7 +23,7 @@ __all__ = ["InMemoryDatasetHandle"] import dataclasses -from typing import Any, Optional +from typing import Any, Optional, cast from frozendict import frozendict from lsst.daf.butler import ( @@ -175,6 +175,9 @@ class can be found. # Parameters for derived components are applied against the # composite. if component in thisStorageClass.derivedComponents: + # For some reason MyPy doesn't see the line above as narrowing + # 'component' from 'str | None' to 'str'. + component = cast(str, component) thisStorageClass.validateParameters(parameters) # Process the parameters (hoping this never modified the From a81fe399596d65144563d348530b43aeeaa69484 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Sat, 6 May 2023 12:17:02 -0400 Subject: [PATCH 04/10] Modify MockPipelineTask to use MockDataset. --- python/lsst/pipe/base/tests/mocks/__init__.py | 2 + .../pipe/base/tests/mocks/_pipeline_task.py | 246 ++++++++++++++---- .../pipe/base/tests/mocks/_storage_class.py | 2 +- 3 files changed, 200 insertions(+), 50 deletions(-) diff --git a/python/lsst/pipe/base/tests/mocks/__init__.py b/python/lsst/pipe/base/tests/mocks/__init__.py index c4a5ace0..b279c235 100644 --- a/python/lsst/pipe/base/tests/mocks/__init__.py +++ b/python/lsst/pipe/base/tests/mocks/__init__.py @@ -19,4 +19,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from ._data_id_match import * +from ._pipeline_task import * from ._storage_class import * diff --git a/python/lsst/pipe/base/tests/mocks/_pipeline_task.py b/python/lsst/pipe/base/tests/mocks/_pipeline_task.py index aa81920e..6b16c344 100644 --- a/python/lsst/pipe/base/tests/mocks/_pipeline_task.py +++ b/python/lsst/pipe/base/tests/mocks/_pipeline_task.py @@ -20,18 +20,25 @@ # along with this program. If not, see . from __future__ import annotations -__all__ = ("MockPipelineTask", "MockPipelineTaskConfig") +__all__ = ("MockPipelineTask", "MockPipelineTaskConfig", "mock_task_defs") +import dataclasses import logging -from typing import TYPE_CHECKING, Any +from collections.abc import Iterable, Mapping +from typing import TYPE_CHECKING, Any, ClassVar -from lsst.pex.config import Field +from lsst.daf.butler import DeferredDatasetHandle +from lsst.pex.config import ConfigurableField, Field, ListField from lsst.utils.doImport import doImportType +from lsst.utils.introspection import get_full_type_name +from lsst.utils.iteration import ensure_iterable -from ...pipelineTask import PipelineTask from ...config import PipelineTaskConfig -from ...connections import PipelineTaskConnections, InputQuantizedConnection, OutputQuantizedConnection +from ...connections import InputQuantizedConnection, OutputQuantizedConnection, PipelineTaskConnections +from ...pipeline import TaskDef +from ...pipelineTask import PipelineTask from ._data_id_match import DataIdMatch +from ._storage_class import MockDataset, MockDatasetQuantum, MockStorageClass, get_mock_name _LOG = logging.getLogger(__name__) @@ -39,17 +46,130 @@ from ...butlerQuantumContext import ButlerQuantumContext -class MockPipelineTaskConfig(PipelineTaskConfig, pipelineConnections=PipelineTaskConnections): - fail_condition: Field[str] = Field( +def mock_task_defs( + originals: Iterable[TaskDef], + unmocked_dataset_types: Iterable[str] = (), + force_failures: Mapping[str, tuple[str, type[Exception]]] | None = None, +) -> list[TaskDef]: + """Create mocks for an iterable of TaskDefs. + + Parameters + ---------- + originals : `~collections.abc.Iterable` [ `TaskDef` ] + Original tasks and configuration to mock. + unmocked_dataset_types : `~collections.abc.Iterable` [ `str` ], optional + Names of overall-input dataset types that should not be replaced with + mocks. + force_failures : `~collections.abc.Mapping` [ `str`, `tuple` [ `str`, \ + `type` [ `Exception` ] ] ] + Mapping from original task label to a 2-tuple indicating that some + quanta should raise an exception when executed. The first entry is a + data ID match using the butler expression language (i.e. a string of + the sort passed ass the ``where`` argument to butler query methods), + while the second is the type of exception to raise when the quantum + data ID matches the expression. + + Returns + ------- + mocked : `list` [ `TaskDef` ] + List of `TaskDef` objects using `MockPipelineTask` configurations that + target the original tasks, in the same order. + """ + unmocked_dataset_types = tuple(unmocked_dataset_types) + if force_failures is None: + force_failures = {} + results: list[TaskDef] = [] + for original_task_def in originals: + config = MockPipelineTaskConfig() + config.original.retarget(original_task_def.taskClass) + config.original = original_task_def.config + config.unmocked_dataset_types.extend(unmocked_dataset_types) + if original_task_def.label in force_failures: + condition, exception_type = force_failures[original_task_def.label] + config.fail_condition = condition + config.fail_exception = get_full_type_name(exception_type) + mock_task_def = TaskDef( + config=config, taskClass=MockPipelineTask, label=get_mock_name(original_task_def.label) + ) + results.append(mock_task_def) + return results + + +class MockPipelineDefaultTargetConnections(PipelineTaskConnections, dimensions=()): + pass + + +class MockPipelineDefaultTargetConfig( + PipelineTaskConfig, pipelineConnections=MockPipelineDefaultTargetConnections +): + pass + + +class MockPipelineDefaultTargetTask(PipelineTask): + """A `PipelineTask` class used as the default target for + ``MockPipelineTaskConfig.original``. + + This is effectively a workaround for `lsst.pex.config.ConfigurableField` + not supporting ``optional=True``, but that is generally a reasonable + limitation for production code and it wouldn't make sense just to support + test utilities. + """ + + ConfigClass = MockPipelineDefaultTargetConfig + + +class MockPipelineTaskConnections(PipelineTaskConnections, dimensions=()): + def __init__(self, *, config: MockPipelineTaskConfig): + original: PipelineTaskConnections = config.original.connections.ConnectionsClass( + config=config.original.value + ) + self.dimensions.update(original.dimensions) + unmocked_dataset_types = frozenset(config.unmocked_dataset_types) + for name, connection in original.allConnections.items(): + if name in original.initInputs or name in original.initOutputs: + # We just ignore initInputs and initOutputs, because the task + # is never given DatasetRefs for those and hence can't create + # mocks. + continue + if connection.name not in unmocked_dataset_types: + # We register the mock storage class with the global singleton + # here, but can only put its name in the connection. That means + # the same global singleton (or one that also has these + # registrations) has to be available whenever this dataset type + # is used. + storage_class = MockStorageClass.get_or_register_mock(connection.storageClass) + kwargs = {} + if hasattr(connection, "dimensions"): + connection_dimensions = set(connection.dimensions) + # Replace the generic "skypix" placeholder with htm7, since + # that requires the dataset type to have already been + # registered. + if "skypix" in connection_dimensions: + connection_dimensions.remove("skypix") + connection_dimensions.add("htm7") + kwargs["dimensions"] = connection_dimensions + connection = dataclasses.replace( + connection, + name=get_mock_name(connection.name), + storageClass=storage_class.name, + **kwargs, + ) + elif name in original.outputs: + raise ValueError(f"Unmocked dataset type {connection.name!r} cannot be used as an output.") + setattr(self, name, connection) + + +class MockPipelineTaskConfig(PipelineTaskConfig, pipelineConnections=MockPipelineTaskConnections): + fail_condition = Field[str]( dtype=str, default="", doc=( - "Condition on DataId to raise an exception. String expression which includes attributes of " - "quantum DataId using a syntax of daf_butler user expressions (e.g. 'visit = 123')." + "Condition on Data ID to raise an exception. String expression which includes attributes of " + "quantum data ID using a syntax of daf_butler user expressions (e.g. 'visit = 123')." ), ) - fail_exception: Field[str] = Field( + fail_exception = Field[str]( dtype=str, default="builtins.ValueError", doc=( @@ -58,6 +178,20 @@ class MockPipelineTaskConfig(PipelineTaskConfig, pipelineConnections=PipelineTas ), ) + original: ConfigurableField = ConfigurableField( + doc="The original task being mocked by this one.", target=MockPipelineDefaultTargetTask + ) + + unmocked_dataset_types = ListField[str]( + doc=( + "Names of input dataset types that should be used as-is instead " + "of being mocked. May include dataset types not relevant for " + "this task, which will be ignored." + ), + default=(), + optional=False, + ) + def data_id_match(self) -> DataIdMatch | None: if not self.fail_condition: return None @@ -65,27 +199,39 @@ def data_id_match(self) -> DataIdMatch | None: class MockPipelineTask(PipelineTask): - """Implementation of PipelineTask used for running a mock pipeline. + """Implementation of `PipelineTask` used for running a mock pipeline. Notes ----- - This class overrides `runQuantum` to read all input datasetRefs and to - store simple dictionary as output data. Output dictionary contains some - provenance data about inputs, the task that produced it, and corresponding - quantum. This class depends on `MockButlerQuantumContext` which knows how - to store the output dictionary data with special dataset types. + This class overrides `runQuantum` to read inputs and write a bit of + provenance into all of its outputs (always `MockDataset` instances). It + can also be configured to raise exceptions on certain data IDs. It reads + `MockDataset` inputs and simulates reading inputs of other types by + creating `MockDataset` inputs from their DatasetRefs. + + At present `MockPipelineTask` simply drops any ``initInput`` and + ``initOutput`` connections present on the original, since `MockDataset` + creation for those would have to happen in the code that executes the task, + not in the task itself. Because `MockPipelineTask` never instantiates the + mock task (just its connections class), this is a limitation on what the + mocks can be used to test, not anything deeper. """ - ConfigClass = MockPipelineTaskConfig + ConfigClass: ClassVar[type[PipelineTaskConfig]] = MockPipelineTaskConfig - def __init__(self, *, config: MockPipelineTaskConfig | None = None, **kwargs: Any): + def __init__( + self, + *, + config: MockPipelineTaskConfig, + **kwargs: Any, + ): super().__init__(config=config, **kwargs) self.fail_exception: type | None = None - self.data_id_match: DataIdMatch | None = None - if config is not None: - self.data_id_match = config.data_id_match() - if self.data_id_match: - self.fail_exception = doImportType(config.fail_exception) + self.data_id_match = self.config.data_id_match() + if self.data_id_match: + self.fail_exception = doImportType(self.config.fail_exception) + + config: MockPipelineTaskConfig def runQuantum( self, @@ -107,37 +253,39 @@ def runQuantum( assert self.fail_exception is not None, "Exception type must be defined" raise self.fail_exception(message) - # read all inputs - inputs = butlerQC.get(inputRefs) - - _LOG.info("Read input data for task '%s' on quantum %s", self.getName(), quantum.dataId) - - # To avoid very deep provenance we trim inputs to a single level - for name, data in inputs.items(): - if isinstance(data, dict): - data = [data] - if isinstance(data, list): - for item in data: - qdata = item.get("quantum", {}) - qdata.pop("inputs", None) + # Populate the bit of provenance we store in all outputs. + _LOG.info("Reading input data for task '%s' on quantum %s", self.getName(), quantum.dataId) + mock_dataset_quantum = MockDatasetQuantum( + task_label=self.getName(), data_id=quantum.dataId.to_simple(), inputs={} + ) + for name, refs in inputRefs: + inputs_list = [] + for ref in ensure_iterable(refs): + if isinstance(ref.datasetType.storageClass, MockStorageClass): + input_dataset = butlerQC.get(ref) + if isinstance(input_dataset, DeferredDatasetHandle): + input_dataset = input_dataset.get() + if not isinstance(input_dataset, MockDataset): + raise TypeError( + f"Expected MockDataset instance for {ref}; " + f"got {input_dataset!r} of type {type(input_dataset)!r}." + ) + # To avoid very deep provenance we trim inputs to a single + # level. + input_dataset.quantum = None + else: + input_dataset = MockDataset(ref=ref.to_simple()) + inputs_list.append(input_dataset) + mock_dataset_quantum.inputs[name] = inputs_list # store mock outputs for name, refs in outputRefs: if not isinstance(refs, list): refs = [refs] for ref in refs: - data = { - "ref": { - "dataId": {key.name: ref.dataId[key] for key in ref.dataId.keys()}, - "datasetType": ref.datasetType.name, - }, - "quantum": { - "task": self.getName(), - "dataId": {key.name: quantum.dataId[key] for key in quantum.dataId.keys()}, - "inputs": inputs, - }, - "outputName": name, - } - butlerQC.put(data, ref) + output = MockDataset( + ref=ref.to_simple(), quantum=mock_dataset_quantum, output_connection_name=name + ) + butlerQC.put(output, ref) _LOG.info("Finished mocking task '%s' on quantum %s", self.getName(), quantum.dataId) diff --git a/python/lsst/pipe/base/tests/mocks/_storage_class.py b/python/lsst/pipe/base/tests/mocks/_storage_class.py index f89d2f6d..58c3f968 100644 --- a/python/lsst/pipe/base/tests/mocks/_storage_class.py +++ b/python/lsst/pipe/base/tests/mocks/_storage_class.py @@ -64,7 +64,7 @@ def get_original_name(mock: str) -> str: label that corresponds to the given mock name. """ assert mock.startswith(_NAME_PREFIX) - return mock[len(_NAME_PREFIX) :] + return mock.removeprefix(_NAME_PREFIX) def is_mock_name(name: str) -> bool: From c7e67a5e6a53025b2b750df55b00c5f196ff7223 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 19 May 2023 15:55:34 -0400 Subject: [PATCH 05/10] Use monkey-patching to handle formatters for mock storage classes. --- .../pipe/base/tests/mocks/_storage_class.py | 144 +++++++++--------- 1 file changed, 71 insertions(+), 73 deletions(-) diff --git a/python/lsst/pipe/base/tests/mocks/_storage_class.py b/python/lsst/pipe/base/tests/mocks/_storage_class.py index 58c3f968..fdacfc62 100644 --- a/python/lsst/pipe/base/tests/mocks/_storage_class.py +++ b/python/lsst/pipe/base/tests/mocks/_storage_class.py @@ -36,8 +36,10 @@ import pydantic from lsst.daf.butler import ( - Config, DatasetComponent, + Formatter, + FormatterFactory, + LookupKey, SerializedDataCoordinate, SerializedDatasetRef, SerializedDatasetType, @@ -45,10 +47,9 @@ StorageClassDelegate, StorageClassFactory, ) -from lsst.resources import ResourcePath, ResourcePathExpression +from lsst.daf.butler.formatters.json import JsonFormatter from lsst.utils.introspection import get_full_type_name - _NAME_PREFIX: str = "_mock_" @@ -274,76 +275,6 @@ def get_or_register_mock( factory.registerStorageClass(result) return result - @staticmethod - def make_formatter_config_dir( - root: ResourcePathExpression, factory: StorageClassFactory | None = None - ) -> None: - """Make a directory suitable for inclusion in the butler config - search path. - - Parameters - ---------- - root : convertible to `lsst.resources.ResourcePath` - Root directory that will be passed as any element of the - ``searchPaths`` list at `~lsst.daf.butler.Butler` construction. - factory : `StorageClassFactory`, optional - Storage class factory singleton instance. - - Notes - ----- - This adds formatter entries to `~lsst.daf.butler.FileDatastore` - configuration for all mock storage classes that have been registered so - far. It does not add the storage class configuration entries - themselves, because `MockStorageClass` can't be written out as - configuration in the same way that regular storage classes can. So the - usual pattern for creating a butler client with storage class mocking - is: - - - Register all needed mock storage classes with the singleton - `lsst.daf.butler.StorageClassFactory`, which is constructed on first - use even when there are no active butler clients. - - - Call this method to create a directory with formatter configuration - for those storage classes (within the same process, so singleton - state is preserved). - - - Create a butler client while passing ``searchPaths=[root]``, again - within the same process. - - There is currently no automatic way to share mock storage class - definitions across processes, other than to re-run the code that - registers those mock storage classes. - - Note that the data repository may be created any time before the butler - client is - the formatter configuration written by this method is - expected to be used per-client, not at data repository construction, - and nothing about the mocks is persistent within the data repository - other than dataset types that use those storage classes (and dataset - types with unrecognized storage classes are already carefully handled - by the butler). - """ - if factory is None: - factory = StorageClassFactory() - formatters_config = Config() - for storage_class_name in factory.keys(): - if is_mock_name(storage_class_name): - # Have to escape mock storage class names, because - # config["_mock_X"] gets reinterpreted as a nested config key, - # i.e. config["mock"]["X"]. - formatters_config["\\" + storage_class_name] = "lsst.daf.butler.formatters.json.JsonFormatter" - root = ResourcePath(root, forceDirectory=True) - dir_path = root.join("datastores", forceDirectory=True) - dir_path.mkdir() - formatters_config.dumpToUri(dir_path.join("formatters.yaml")) - with dir_path.join("fileDatastore.yaml").open("w") as stream: - stream.write( - "datastore:\n" - " cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore\n" - " formatters: !include formatters.yaml\n" - ) - with root.join("datastore.yaml").open("w") as stream: - stream.write("datastore:\n cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore\n") - def allComponents(self) -> Mapping[str, MockStorageClass]: # Docstring inherited. return cast(Mapping[str, MockStorageClass], super().allComponents()) @@ -382,3 +313,70 @@ def coerce_type(self, incorrect: Any) -> Any: f"{other_storage_class.original.name!r}." ) return incorrect.make_derived(storageClass=self.name, converted_from=incorrect) + + +def _monkeypatch_daf_butler() -> None: + """Replace methods in daf_butler's StorageClassFactory and FormatterFactory + classes to automatically recognize mock storage classes. + + This monkey-patching is executed when the `lsst.pipe.base.tests.mocks` + package is imported, and it affects all butler instances created before or + after that imported. + """ + original_get_storage_class = StorageClassFactory.getStorageClass + + def new_get_storage_class(self: StorageClassFactory, storageClassName: str) -> StorageClass: + try: + return original_get_storage_class(self, storageClassName) + except KeyError: + if is_mock_name(storageClassName): + return MockStorageClass.get_or_register_mock(get_original_name(storageClassName)) + raise + + StorageClassFactory.getStorageClass = new_get_storage_class # type: ignore + + del new_get_storage_class + + original_get_formatter_class_with_match = FormatterFactory.getFormatterClassWithMatch + + def new_get_formatter_class_with_match( + self: FormatterFactory, entity: Any + ) -> tuple[LookupKey, type[Formatter], dict[str, Any]]: + try: + return original_get_formatter_class_with_match(self, entity) + except KeyError: + lookup_keys = (LookupKey(name=entity),) if isinstance(entity, str) else entity._lookupNames() + for key in lookup_keys: + # This matches mock dataset type names before mock storage + # classes, and it would even match some regular dataset types + # that are automatic connections (logs, configs, metadata) of + # mocked tasks. The latter would be a problem, except that + # those should have already matched in the try block above. + if is_mock_name(key.name): + return (key, JsonFormatter, {}) + raise + + FormatterFactory.getFormatterClassWithMatch = new_get_formatter_class_with_match # type: ignore + + del new_get_formatter_class_with_match + + original_get_formatter_with_match = FormatterFactory.getFormatterWithMatch + + def new_get_formatter_with_match( + self: FormatterFactory, entity: Any, *args: Any, **kwargs: Any + ) -> tuple[LookupKey, Formatter]: + try: + return original_get_formatter_with_match(self, entity, *args, **kwargs) + except KeyError: + lookup_keys = (LookupKey(name=entity),) if isinstance(entity, str) else entity._lookupNames() + for key in lookup_keys: + if is_mock_name(key.name): + return (key, JsonFormatter(*args, **kwargs)) + raise + + FormatterFactory.getFormatterWithMatch = new_get_formatter_with_match # type: ignore + + del new_get_formatter_with_match + + +_monkeypatch_daf_butler() From c93654cd2103801bc4846544622fdae788ee567f Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 26 May 2023 15:50:28 -0400 Subject: [PATCH 06/10] Add docs for mocks package. --- doc/changes/DM-38952.feature.md | 1 + doc/lsst.pipe.base/index.rst | 3 +++ .../testing-pipelines-with-mocks.rst | 25 +++++++++++++++++++ python/lsst/pipe/base/tests/mocks/__init__.py | 6 +++++ 4 files changed, 35 insertions(+) create mode 100644 doc/changes/DM-38952.feature.md create mode 100644 doc/lsst.pipe.base/testing-pipelines-with-mocks.rst diff --git a/doc/changes/DM-38952.feature.md b/doc/changes/DM-38952.feature.md new file mode 100644 index 00000000..0da70bad --- /dev/null +++ b/doc/changes/DM-38952.feature.md @@ -0,0 +1 @@ +Revived bitrotted support for "mocked" `PipelineTask` execution and moved it here (from `ctrl_mpexec`). diff --git a/doc/lsst.pipe.base/index.rst b/doc/lsst.pipe.base/index.rst index 43d8186c..401a035e 100644 --- a/doc/lsst.pipe.base/index.rst +++ b/doc/lsst.pipe.base/index.rst @@ -58,6 +58,7 @@ Developing Pipelines :maxdepth: 1 creating-a-pipeline.rst + testing-pipelines-with-mocks.rst .. _lsst.pipe.base-contributing: @@ -85,3 +86,5 @@ Python API reference .. automodapi:: lsst.pipe.base.pipelineIR :no-main-docstr: + +.. automodapi:: lsst.pipe.base.tests.mocks diff --git a/doc/lsst.pipe.base/testing-pipelines-with-mocks.rst b/doc/lsst.pipe.base/testing-pipelines-with-mocks.rst new file mode 100644 index 00000000..c50134f8 --- /dev/null +++ b/doc/lsst.pipe.base/testing-pipelines-with-mocks.rst @@ -0,0 +1,25 @@ +.. py:currentmodule:: lsst.pipe.base.tests.mocks + +.. _testing-pipelines-with-mocks: + +############################ +Testing pipelines with mocks +############################ + +The `lsst.pipe.base.tests.mocks` package provides a way to build and execute `.QuantumGraph` objects without actually running any real task code or relying on real data. +This is primarily for testing the middleware responsible for `.QuantumGraph` generation and execution, but it can also be used to check that the connections in a configured pipeline are consistent with each other and with any documented recommendations for how to run those steps (e.g., which dimensions can safely be constrained by user expressions). + +The high-level entry point to this system is `mock_task_defs` function, which takes an iterable of `.TaskDef` objects (typically obtained from `.Pipeline.toExpandedPipeline`) and returns a new sequence of `.TaskDef` objects, in which each original task has been replaced by a configuration of `MockPipelineTask` whose connections are analogous to the original. +Passing the ``--mock`` option to ``pipetask qgraph`` or ``pipetask run`` will run this on the given pipeline when building the graph. +When a pipeline is mocked, all task labels and dataset types are transformed by the `get_mock_name` function (so these can live alongside their real counterparts in a single data repository), and the storage classes of all regular connections are replaced with instances of `MockStorageClass`. +The in-memory Python type for `MockStorageClass` is always `MockDataset`, which is always written to disk in JSON format, but conversions between mock storage classes are always defined analogously to the original storage classes they mock, and the `MockDataset` class records conversions (and component access and parameters) when they occur, allowing test code that runs later to load them and inspect exactly how the object was loaded and provided to the task when it was executed. + +The `MockPipelineTask.runQuantum` method reads all input mocked datasets that correspond to a `MockStorageClass` and simulates reading any input datasets there were not mocked (via the `MockPipelineTaskConfig.unmocked_dataset_types` config option, or the `mock_task_defs` argument of the same name) by constructing a new `MockDataset` instance for them. +It then constructs and writes new `MockDataset` instances for each of its predicted outputs, storing copies of the input `MockDataset`\s within them. +`MockPipelineTaskConfig` and `mock_task_defs` also have options for causing quanta that match a data ID expression to raise an exception instead. +Dataset types produced by the execution framework - configs, logs, metadata, and package version information - are not mocked, but they are given names with the prefix added by `get_mock_name` by virtue of being constructed from a task label that has that prefix. + +Importing the `lsst.pipe.base.tests.mocks` package causes the `~lsst.daf.butler.StorageClassFactory` and `~lsst.daf.butler.FormatterFactory` classes to be monkey-patched with special code that recognizes mock storage class names without being included in any butler configuration files. +This should not affect how any non-mock storage classes are handled, but it is still best to only import `lsst.pipe.base.tests.mocks` in code that is *definitely* using the mock system, even if that means putting the import at function scope instead of module scope. + +The `ci_middleware `_ package is the primary place where this mocking library is used, and the home of its unit tests, but it has been designed to be usable in regular "real" data repositories as well. diff --git a/python/lsst/pipe/base/tests/mocks/__init__.py b/python/lsst/pipe/base/tests/mocks/__init__.py index b279c235..48ffbcda 100644 --- a/python/lsst/pipe/base/tests/mocks/__init__.py +++ b/python/lsst/pipe/base/tests/mocks/__init__.py @@ -19,6 +19,12 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +"""A system for replacing the tasks in a pipeline with mocks that just read and +write trivial datasets. + +See :ref:`testing-pipelines-with-mocks` for details. +""" + from ._data_id_match import * from ._pipeline_task import * from ._storage_class import * From 0a9246992db7b3b1fd266918ff0c13bbb9f1f1dd Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Tue, 6 Jun 2023 22:30:30 -0400 Subject: [PATCH 07/10] Modernize type annotations in DataIdMatch. --- python/lsst/pipe/base/tests/mocks/_data_id_match.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/python/lsst/pipe/base/tests/mocks/_data_id_match.py b/python/lsst/pipe/base/tests/mocks/_data_id_match.py index 7021ac5e..c0ae3b28 100644 --- a/python/lsst/pipe/base/tests/mocks/_data_id_match.py +++ b/python/lsst/pipe/base/tests/mocks/_data_id_match.py @@ -19,10 +19,13 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from __future__ import annotations + __all__ = ["DataIdMatch"] import operator -from typing import Any, Callable, List, Optional, Tuple +from collections.abc import Callable +from typing import Any import astropy.time from lsst.daf.butler import DataId @@ -52,7 +55,7 @@ def visitTimeLiteral(self, value: astropy.time.Time, node: Node) -> Any: # docstring is inherited from base class return value - def visitRangeLiteral(self, start: int, stop: int, stride: Optional[int], node: Node) -> Any: + def visitRangeLiteral(self, start: int, stop: int, stride: int | None, node: Node) -> Any: # docstring is inherited from base class if stride is None: return range(start, stop + 1) @@ -91,7 +94,7 @@ def visitBinaryOp(self, operator_name: str, lhs: Any, rhs: Any, node: Node) -> A } return operators[operator_name](lhs, rhs) - def visitIsIn(self, lhs: Any, values: List[Any], not_in: bool, node: Node) -> Any: + def visitIsIn(self, lhs: Any, values: list[Any], not_in: bool, node: Node) -> Any: # docstring is inherited from base class is_in = True for value in values: @@ -109,11 +112,11 @@ def visitParens(self, expression: Any, node: Node) -> Any: # docstring is inherited from base class return expression - def visitTupleNode(self, items: Tuple[Any, ...], node: Node) -> Any: + def visitTupleNode(self, items: tuple[Any, ...], node: Node) -> Any: # docstring is inherited from base class raise NotImplementedError() - def visitFunctionCall(self, name: str, args: List[Any], node: Node) -> Any: + def visitFunctionCall(self, name: str, args: list[Any], node: Node) -> Any: # docstring is inherited from base class raise NotImplementedError() From a93fc4d2410fc17d54b0e87b334a407e038c0e1a Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Tue, 6 Jun 2023 22:31:51 -0400 Subject: [PATCH 08/10] Tell coverage to ignore some patterns. --- pyproject.toml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index d97f9997..808bdea9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -115,3 +115,13 @@ addopts = "--flake8" flake8-ignore = ["E203", "W503", "N802", "N803", "N806", "N812", "N815", "N816"] # Some unit tests open registry database and don't close it. open_files_ignore = ["gen3.sqlite3"] + +[tool.coverage.report] +show_missing = true +exclude_lines = [ + "pragma: no cover", + "raise AssertionError", + "raise NotImplementedError", + "if __name__ == .__main__.:", + "if TYPE_CHECKING:", +] From 3766d9f886001290eaeba9003e786c77d3cbc3c2 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Thu, 8 Jun 2023 11:31:27 -0400 Subject: [PATCH 09/10] Add dataId kwarg to GraphBuilder.makeGraph. This allows the instrument embedded in a pipeline to be passed into the graph generation algorithm even when the pipeline is converted to a sequence of TaskDef objects in advance. --- python/lsst/pipe/base/graphBuilder.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/lsst/pipe/base/graphBuilder.py b/python/lsst/pipe/base/graphBuilder.py index 1e65f301..c2658345 100644 --- a/python/lsst/pipe/base/graphBuilder.py +++ b/python/lsst/pipe/base/graphBuilder.py @@ -1557,6 +1557,7 @@ def makeGraph( datasetQueryConstraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL, metadata: Optional[Mapping[str, Any]] = None, bind: Optional[Mapping[str, Any]] = None, + dataId: DataCoordinate | None = None, ) -> QuantumGraph: """Create execution graph for a pipeline. @@ -1585,6 +1586,8 @@ def makeGraph( bind : `Mapping`, optional Mapping containing literal values that should be injected into the ``userQuery`` expression, keyed by the identifiers they replace. + dataId : `lsst.daf.butler.DataCoordinate`, optional + Data ID that should also be included in the query constraint. Returns ------- @@ -1611,9 +1614,9 @@ def makeGraph( pipeline = list(pipeline.toExpandedPipeline()) if instrument_class is not None: dataId = DataCoordinate.standardize( - instrument=instrument_class.getName(), universe=self.registry.dimensions + dataId, instrument=instrument_class.getName(), universe=self.registry.dimensions ) - else: + elif dataId is None: dataId = DataCoordinate.makeEmpty(self.registry.dimensions) with scaffolding.connectDataIds( self.registry, collections, userQuery, dataId, datasetQueryConstraint, bind From 7dcc6993a35aa0c1ad4630e5c5f5945de93b56da Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Thu, 8 Jun 2023 18:30:40 -0400 Subject: [PATCH 10/10] Add Pipeline.get_data_id. --- python/lsst/pipe/base/graphBuilder.py | 15 +++---------- python/lsst/pipe/base/pipeline.py | 31 ++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/python/lsst/pipe/base/graphBuilder.py b/python/lsst/pipe/base/graphBuilder.py index c2658345..7d2d322e 100644 --- a/python/lsst/pipe/base/graphBuilder.py +++ b/python/lsst/pipe/base/graphBuilder.py @@ -53,7 +53,6 @@ from lsst.daf.butler.registry import MissingCollectionError, MissingDatasetTypeError from lsst.daf.butler.registry.queries import DataCoordinateQueryResults from lsst.daf.butler.registry.wildcards import CollectionWildcard -from lsst.utils import doImportType # ----------------------------- # Imports for other modules -- @@ -1606,18 +1605,10 @@ def makeGraph( scaffolding = _PipelineScaffolding(pipeline, registry=self.registry) if not collections and (scaffolding.initInputs or scaffolding.inputs or scaffolding.prerequisites): raise ValueError("Pipeline requires input datasets but no input collections provided.") - instrument_class: Optional[Any] = None - if isinstance(pipeline, Pipeline): - instrument_class_name = pipeline.getInstrument() - if instrument_class_name is not None: - instrument_class = doImportType(instrument_class_name) - pipeline = list(pipeline.toExpandedPipeline()) - if instrument_class is not None: - dataId = DataCoordinate.standardize( - dataId, instrument=instrument_class.getName(), universe=self.registry.dimensions - ) - elif dataId is None: + if dataId is None: dataId = DataCoordinate.makeEmpty(self.registry.dimensions) + if isinstance(pipeline, Pipeline): + dataId = pipeline.get_data_id(self.registry.dimensions).union(dataId) with scaffolding.connectDataIds( self.registry, collections, userQuery, dataId, datasetQueryConstraint, bind ) as commonDataIds: diff --git a/python/lsst/pipe/base/pipeline.py b/python/lsst/pipe/base/pipeline.py index f1d198d0..e5754cd7 100644 --- a/python/lsst/pipe/base/pipeline.py +++ b/python/lsst/pipe/base/pipeline.py @@ -55,7 +55,14 @@ # ----------------------------- # Imports for other modules -- -from lsst.daf.butler import DatasetType, NamedValueSet, Registry, SkyPixDimension +from lsst.daf.butler import ( + DataCoordinate, + DatasetType, + DimensionUniverse, + NamedValueSet, + Registry, + SkyPixDimension, +) from lsst.resources import ResourcePath, ResourcePathExpression from lsst.utils import doImportType from lsst.utils.introspection import get_full_type_name @@ -613,6 +620,28 @@ def getInstrument(self) -> Optional[str]: """ return self._pipelineIR.instrument + def get_data_id(self, universe: DimensionUniverse) -> DataCoordinate: + """Return a data ID with all dimension constraints embedded in the + pipeline. + + Parameters + ---------- + universe : `lsst.daf.butler.DimensionUniverse` + Object that defines all dimensions. + + Returns + ------- + data_id : `lsst.daf.butler.DataCoordinate` + Data ID with all dimension constraints embedded in the + pipeline. + """ + instrument_class_name = self._pipelineIR.instrument + if instrument_class_name is not None: + instrument_class = doImportType(instrument_class_name) + if instrument_class is not None: + return DataCoordinate.standardize(instrument=instrument_class.getName(), universe=universe) + return DataCoordinate.makeEmpty(universe) + def addTask(self, task: Union[Type[PipelineTask], str], label: str) -> None: """Add a new task to the pipeline, or replace a task that is already associated with the supplied label.