diff --git a/doc/changes/DM-39582.misc.md b/doc/changes/DM-39582.misc.md new file mode 100644 index 0000000000..dba103939b --- /dev/null +++ b/doc/changes/DM-39582.misc.md @@ -0,0 +1,2 @@ +Added ability for some butler primitives to be cached and re-used on deserialization through a special +interface. diff --git a/doc/changes/DM-39582.removal.md b/doc/changes/DM-39582.removal.md new file mode 100644 index 0000000000..5b6bef359f --- /dev/null +++ b/doc/changes/DM-39582.removal.md @@ -0,0 +1 @@ +Deprecate reconstituteDimensions argument from `Quantum.from_simple`. diff --git a/python/lsst/daf/butler/_butler.py b/python/lsst/daf/butler/_butler.py index 9b9fe8d8a3..0005af9375 100644 --- a/python/lsst/daf/butler/_butler.py +++ b/python/lsst/daf/butler/_butler.py @@ -1651,9 +1651,9 @@ def exists( if full_check: if self.datastore.exists(ref): existence |= DatasetExistence._ARTIFACT - elif existence != DatasetExistence.UNRECOGNIZED: + elif existence.value != DatasetExistence.UNRECOGNIZED.value: # Do not add this flag if we have no other idea about a dataset. - existence |= DatasetExistence._ASSUMED + existence |= DatasetExistence(DatasetExistence._ASSUMED) return existence diff --git a/python/lsst/daf/butler/core/__init__.py b/python/lsst/daf/butler/core/__init__.py index 7d803c6e69..0fd255ec18 100644 --- a/python/lsst/daf/butler/core/__init__.py +++ b/python/lsst/daf/butler/core/__init__.py @@ -29,6 +29,7 @@ from .logging import ButlerLogRecords from .mappingFactory import * from .named import * +from .persistenceContext import * from .progress import Progress from .quantum import * from .storageClass import * diff --git a/python/lsst/daf/butler/core/config.py b/python/lsst/daf/butler/core/config.py index 5eb7d9c880..efcc9481f6 100644 --- a/python/lsst/daf/butler/core/config.py +++ b/python/lsst/daf/butler/core/config.py @@ -39,7 +39,7 @@ import yaml from lsst.resources import ResourcePath, ResourcePathExpression -from lsst.utils import doImport +from lsst.utils import doImportType from yaml.representer import Representer yaml.add_representer(defaultdict, Representer.represent_dict) @@ -1203,18 +1203,17 @@ def __init__( if pytype is not None: try: - cls = doImport(pytype) + cls = doImportType(pytype) except ImportError as e: raise RuntimeError(f"Failed to import cls '{pytype}' for config {type(self)}") from e - defaultsFile = cls.defaultConfigFile + # The class referenced from the config file is not required + # to specify a default config file. + defaultsFile = getattr(cls, "defaultConfigFile", None) if defaultsFile is not None: self._updateWithConfigsFromPath(fullSearchPath, defaultsFile) - # Get the container key in case we need it - try: - containerKey = cls.containerKey - except AttributeError: - pass + # Get the container key in case we need it and it is specified. + containerKey = getattr(cls, "containerKey", None) # Now update this object with the external values so that the external # values always override the defaults diff --git a/python/lsst/daf/butler/core/datasets/ref.py b/python/lsst/daf/butler/core/datasets/ref.py index 005fd5b1ff..9edac6bccf 100644 --- a/python/lsst/daf/butler/core/datasets/ref.py +++ b/python/lsst/daf/butler/core/datasets/ref.py @@ -30,6 +30,7 @@ ] import enum +import sys import uuid from collections.abc import Iterable from typing import TYPE_CHECKING, Any, ClassVar @@ -41,6 +42,7 @@ from ..dimensions import DataCoordinate, DimensionGraph, DimensionUniverse, SerializedDataCoordinate from ..json import from_json_pydantic, to_json_pydantic from ..named import NamedKeyDict +from ..persistenceContext import PersistenceContextVars from .type import DatasetType, SerializedDatasetType if TYPE_CHECKING: @@ -142,6 +144,10 @@ def makeDatasetId( return uuid.uuid5(self.NS_UUID, data) +# This is constant, so don't recreate a set for each instance +_serializedDatasetRefFieldsSet = {"id", "datasetType", "dataId", "run", "component"} + + class SerializedDatasetRef(BaseModel): """Simplified model of a `DatasetRef` suitable for serialization.""" @@ -202,9 +208,9 @@ def direct( datasetType if datasetType is None else SerializedDatasetType.direct(**datasetType), ) setter(node, "dataId", dataId if dataId is None else SerializedDataCoordinate.direct(**dataId)) - setter(node, "run", run) + setter(node, "run", sys.intern(run)) setter(node, "component", component) - setter(node, "__fields_set__", {"id", "datasetType", "dataId", "run", "component"}) + setter(node, "__fields_set__", _serializedDatasetRefFieldsSet) return node @@ -254,7 +260,7 @@ class DatasetRef: _serializedType = SerializedDatasetRef __slots__ = ( - "id", + "_id", "datasetType", "dataId", "run", @@ -277,12 +283,22 @@ def __init__( self.dataId = dataId self.run = run if id is not None: - self.id = id + self._id = id.int else: - self.id = DatasetIdFactory().makeDatasetId( - self.run, self.datasetType, self.dataId, id_generation_mode + self._id = ( + DatasetIdFactory() + .makeDatasetId(self.run, self.datasetType, self.dataId, id_generation_mode) + .int ) + @property + def id(self) -> DatasetId: + """Primary key of the dataset (`DatasetId`). + + Cannot be changed after a `DatasetRef` is constructed. + """ + return uuid.UUID(int=self._id) + def __eq__(self, other: Any) -> bool: try: return (self.datasetType, self.dataId, self.id) == (other.datasetType, other.dataId, other.id) @@ -396,9 +412,18 @@ def from_simple( ref : `DatasetRef` Newly-constructed object. """ + cache = PersistenceContextVars.datasetRefs.get() + localName = sys.intern( + datasetType.name + if datasetType is not None + else (x.name if (x := simple.datasetType) is not None else "") + ) + key = (simple.id.int, localName) + if cache is not None and (cachedRef := cache.get(key, None)) is not None: + return cachedRef # Minimalist component will just specify component and id and # require registry to reconstruct - if set(simple.dict(exclude_unset=True, exclude_defaults=True)).issubset({"id", "component"}): + if not (simple.datasetType is not None or simple.dataId is not None or simple.run is not None): if registry is None: raise ValueError("Registry is required to construct component DatasetRef from integer id") if simple.id is None: @@ -408,6 +433,8 @@ def from_simple( raise RuntimeError(f"No matching dataset found in registry for id {simple.id}") if simple.component: ref = ref.makeComponentRef(simple.component) + if cache is not None: + cache[key] = ref return ref if universe is None and registry is None: @@ -443,7 +470,10 @@ def from_simple( f"Encountered with {simple!r}{dstr}." ) - return cls(datasetType, dataId, id=simple.id, run=simple.run) + newRef = cls(datasetType, dataId, id=simple.id, run=simple.run) + if cache is not None: + cache[key] = newRef + return newRef to_json = to_json_pydantic from_json: ClassVar = classmethod(from_json_pydantic) @@ -682,9 +712,3 @@ class associated with the dataset type of the other ref can be Cannot be changed after a `DatasetRef` is constructed. """ - - id: DatasetId - """Primary key of the dataset (`DatasetId`). - - Cannot be changed after a `DatasetRef` is constructed. - """ diff --git a/python/lsst/daf/butler/core/datasets/type.py b/python/lsst/daf/butler/core/datasets/type.py index 1ddbc018c0..72f714d624 100644 --- a/python/lsst/daf/butler/core/datasets/type.py +++ b/python/lsst/daf/butler/core/datasets/type.py @@ -34,6 +34,7 @@ from ..configSupport import LookupKey from ..dimensions import DimensionGraph, SerializedDimensionGraph from ..json import from_json_pydantic, to_json_pydantic +from ..persistenceContext import PersistenceContextVars from ..storageClass import StorageClass, StorageClassFactory if TYPE_CHECKING: @@ -74,6 +75,10 @@ def direct( This method should only be called when the inputs are trusted. """ + cache = PersistenceContextVars.serializedDatasetTypeMapping.get() + key = (name, storageClass or "") + if cache is not None and (type_ := cache.get(key, None)) is not None: + return type_ node = SerializedDatasetType.__new__(cls) setter = object.__setattr__ setter(node, "name", name) @@ -90,6 +95,8 @@ def direct( "__fields_set__", {"name", "storageClass", "dimensions", "parentStorageClass", "isCalibration"}, ) + if cache is not None: + cache[key] = node return node @@ -685,6 +692,13 @@ def from_simple( datasetType : `DatasetType` Newly-constructed object. """ + # check to see if there is a cache, and if there is, if there is a + # cached dataset type + cache = PersistenceContextVars.loadedTypes.get() + key = (simple.name, simple.storageClass or "") + if cache is not None and (type_ := cache.get(key, None)) is not None: + return type_ + if simple.storageClass is None: # Treat this as minimalist representation if registry is None: @@ -708,7 +722,7 @@ def from_simple( # mypy hint raise ValueError(f"Dimensions must be specified in {simple}") - return cls( + newType = cls( name=simple.name, dimensions=DimensionGraph.from_simple(simple.dimensions, universe=universe), storageClass=simple.storageClass, @@ -716,6 +730,9 @@ def from_simple( parentStorageClass=simple.parentStorageClass, universe=universe, ) + if cache is not None: + cache[key] = newType + return newType to_json = to_json_pydantic from_json: ClassVar = classmethod(from_json_pydantic) diff --git a/python/lsst/daf/butler/core/datastoreRecordData.py b/python/lsst/daf/butler/core/datastoreRecordData.py index c6d9f31e0b..5a93078274 100644 --- a/python/lsst/daf/butler/core/datastoreRecordData.py +++ b/python/lsst/daf/butler/core/datastoreRecordData.py @@ -36,6 +36,7 @@ from .datasets import DatasetId from .dimensions import DimensionUniverse +from .persistenceContext import PersistenceContextVars from .storedFileInfo import StoredDatastoreItemInfo if TYPE_CHECKING: @@ -204,6 +205,10 @@ def from_simple( item_info : `StoredDatastoreItemInfo` De-serialized instance of `StoredDatastoreItemInfo`. """ + cache = PersistenceContextVars.dataStoreRecords.get() + key = frozenset(simple.dataset_ids) + if cache is not None and (cachedRecord := cache.get(key)) is not None: + return cachedRecord records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {} # make sure that all dataset IDs appear in the dict even if they don't # have records. @@ -211,9 +216,17 @@ def from_simple( records[dataset_id] = {} for class_name, table_data in simple.records.items(): klass = doImportType(class_name) + if not issubclass(klass, StoredDatastoreItemInfo): + raise RuntimeError( + "The class specified in the SerializedDatastoreRecordData " + f"({get_full_type_name(klass)}) is not a StoredDatastoreItemInfo." + ) for table_name, table_records in table_data.items(): for record in table_records: info = klass.from_record(record) dataset_type_records = records.setdefault(info.dataset_id, {}) dataset_type_records.setdefault(table_name, []).append(info) - return cls(records=records) + newRecord = cls(records=records) + if cache is not None: + cache[key] = newRecord + return newRecord diff --git a/python/lsst/daf/butler/core/dimensions/_coordinate.py b/python/lsst/daf/butler/core/dimensions/_coordinate.py index 2c104b1d46..34576f9c3b 100644 --- a/python/lsst/daf/butler/core/dimensions/_coordinate.py +++ b/python/lsst/daf/butler/core/dimensions/_coordinate.py @@ -39,6 +39,7 @@ from ..json import from_json_pydantic, to_json_pydantic from ..named import NamedKeyDict, NamedKeyMapping, NamedValueAbstractSet, NameLookupMapping +from ..persistenceContext import PersistenceContextVars from ..timespan import Timespan from ._elements import Dimension, DimensionElement from ._graph import DimensionGraph @@ -76,6 +77,10 @@ def direct(cls, *, dataId: dict[str, DataIdValue], records: dict[str, dict]) -> This method should only be called when the inputs are trusted. """ + key = (frozenset(dataId.items()), records is not None) + cache = PersistenceContextVars.serializedDataCoordinateMapping.get() + if cache is not None and (result := cache.get(key)) is not None: + return result node = SerializedDataCoordinate.__new__(cls) setter = object.__setattr__ setter(node, "dataId", dataId) @@ -87,6 +92,8 @@ def direct(cls, *, dataId: dict[str, DataIdValue], records: dict[str, dict]) -> else {k: SerializedDimensionRecord.direct(**v) for k, v in records.items()}, ) setter(node, "__fields_set__", {"dataId", "records"}) + if cache is not None: + cache[key] = node return node @@ -730,6 +737,10 @@ def from_simple( dataId : `DataCoordinate` Newly-constructed object. """ + key = (frozenset(simple.dataId.items()), simple.records is not None) + cache = PersistenceContextVars.dataCoordinates.get() + if cache is not None and (result := cache.get(key)) is not None: + return result if universe is None and registry is None: raise ValueError("One of universe or registry is required to convert a dict to a DataCoordinate") if universe is None and registry is not None: @@ -743,6 +754,8 @@ def from_simple( dataId = dataId.expanded( {k: DimensionRecord.from_simple(v, universe=universe) for k, v in simple.records.items()} ) + if cache is not None: + cache[key] = dataId return dataId to_json = to_json_pydantic diff --git a/python/lsst/daf/butler/core/dimensions/_records.py b/python/lsst/daf/butler/core/dimensions/_records.py index 95f7f12bd8..d35f23bd36 100644 --- a/python/lsst/daf/butler/core/dimensions/_records.py +++ b/python/lsst/daf/butler/core/dimensions/_records.py @@ -30,6 +30,7 @@ from pydantic import BaseModel, Field, StrictBool, StrictFloat, StrictInt, StrictStr, create_model from ..json import from_json_pydantic, to_json_pydantic +from ..persistenceContext import PersistenceContextVars from ..timespan import Timespan, TimespanDatabaseRepresentation from ._elements import Dimension, DimensionElement @@ -166,7 +167,16 @@ def direct( This method should only be called when the inputs are trusted. """ - node = cls.construct(definition=definition, record=record) + _recItems = record.items() + # Type ignore because the ternary statement seems to confuse mypy + # based on conflicting inferred types of v. + key = ( + definition, + frozenset((k, v if not isinstance(v, list) else tuple(v)) for k, v in _recItems), # type: ignore + ) + cache = PersistenceContextVars.serializedDimensionRecordMapping.get() + if cache is not None and (result := cache.get(key)) is not None: + return result node = SerializedDimensionRecord.__new__(cls) setter = object.__setattr__ setter(node, "definition", definition) @@ -177,6 +187,8 @@ def direct( node, "record", {k: v if type(v) != list else tuple(v) for k, v in record.items()} # type: ignore ) setter(node, "__fields_set__", {"definition", "record"}) + if cache is not None: + cache[key] = node return node @@ -367,6 +379,16 @@ def from_simple( if universe is None: # this is for mypy raise ValueError("Unable to determine a usable universe") + _recItems = simple.record.items() + # Type ignore because the ternary statement seems to confuse mypy + # based on conflicting inferred types of v. + key = ( + simple.definition, + frozenset((k, v if not isinstance(v, list) else tuple(v)) for k, v in _recItems), # type: ignore + ) + cache = PersistenceContextVars.dimensionRecords.get() + if cache is not None and (result := cache.get(key)) is not None: + return result definition = DimensionElement.from_simple(simple.definition, universe=universe) @@ -389,7 +411,10 @@ def from_simple( if (hsh := "hash") in rec: rec[hsh] = bytes.fromhex(rec[hsh].decode()) - return _reconstructDimensionRecord(definition, rec) + dimRec = _reconstructDimensionRecord(definition, rec) + if cache is not None: + cache[key] = dimRec + return dimRec to_json = to_json_pydantic from_json: ClassVar = classmethod(from_json_pydantic) diff --git a/python/lsst/daf/butler/core/persistenceContext.py b/python/lsst/daf/butler/core/persistenceContext.py new file mode 100644 index 0000000000..27751388f8 --- /dev/null +++ b/python/lsst/daf/butler/core/persistenceContext.py @@ -0,0 +1,193 @@ +# This file is part of daf_butler. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ("PersistenceContextVars",) + + +import uuid +from collections.abc import Callable +from contextvars import Context, ContextVar, Token, copy_context +from typing import TYPE_CHECKING, ParamSpec, TypeVar, cast + +if TYPE_CHECKING: + from .datasets.ref import DatasetRef + from .datasets.type import DatasetType, SerializedDatasetType + from .datastoreRecordData import DatastoreRecordData + from .dimensions._coordinate import DataCoordinate, SerializedDataCoordinate + from .dimensions._records import DimensionRecord, SerializedDimensionRecord + +_T = TypeVar("_T") +_V = TypeVar("_V") + +_P = ParamSpec("_P") +_Q = ParamSpec("_Q") + + +class PersistenceContextVars: + r"""Helper class for deserializing butler data structures. + + When serializing various butler data structures nested dataset types get + serialized independently. This means what were multiple references to the + same object in memory are all duplicated in the serialization process. + + Upon deserialization multiple independent data structures are created to + represent the same logical bit of data. + + This class can be used to remove this duplication by caching objects as + they are created and returning a reference to that object. This is done in + concert with ``direct`` and ``from_simple`` methods on the various butler + dataset structures. + + This class utilizes class level variables as a form of global state. Each + of the various data structures can look to see if these global caches has + been initialized as a cache (a dictionary) or is in the default None state. + + Users of this class are intended to create an instance, and then call the + `run` method, supplying a callable function, and passing any required + arguments. The `run` method then creates a specific execution context, + initializing the caches, and then runs the supplied function. Upon + completion of the function call, the caches are cleared and returned to the + default state. + + This process is thread safe. + + Note + ---- + Caches of `SerializedDatasetRef`\ s are intentionally left out. It was + discovered that these caused excessive python memory allocations which + though cleaned up upon completion, left the process using more memory than + it otherwise needed as python does not return allocated memory to the OS + until process completion. It was determined the runtime cost of recreating + the `SerializedDatasetRef`\ s was worth the memory savings. + """ + + serializedDatasetTypeMapping: ContextVar[ + dict[tuple[str, str], SerializedDatasetType] | None + ] = ContextVar("serializedDatasetTypeMapping", default=None) + r"""A cache of `SerializedDatasetType`\ s. + """ + + serializedDataCoordinateMapping: ContextVar[ + dict[tuple[frozenset, bool], SerializedDataCoordinate] | None + ] = ContextVar("serializedDataCoordinateMapping", default=None) + r"""A cache of `SerializedDataCoordinate`\ s. + """ + + serializedDimensionRecordMapping: ContextVar[ + dict[tuple[str, frozenset], SerializedDimensionRecord] | None + ] = ContextVar("serializedDimensionRecordMapping", default=None) + r"""A cache of `SerializedDimensionRecord`\ s. + """ + + loadedTypes: ContextVar[dict[tuple[str, str], DatasetType] | None] = ContextVar( + "loadedTypes", default=None + ) + r"""A cache of `DatasetType`\ s. + """ + + dataCoordinates: ContextVar[dict[tuple[frozenset, bool], DataCoordinate] | None] = ContextVar( + "dataCoordinates", default=None + ) + r"""A cache of `DataCoordinate`\ s. + """ + + datasetRefs: ContextVar[dict[tuple[int, str], DatasetRef] | None] = ContextVar( + "datasetRefs", default=None + ) + r"""A cache of `DatasetRef`\ s. + """ + + dimensionRecords: ContextVar[dict[tuple[str, frozenset], DimensionRecord] | None] = ContextVar( + "dimensionRecords", default=None + ) + r"""A cache of `DimensionRecord`\ s. + """ + + dataStoreRecords: ContextVar[dict[frozenset[str | uuid.UUID], DatastoreRecordData] | None] = ContextVar( + "dataStoreRecords", default=None + ) + r"""A cache of `DatastoreRecordData` objects. + """ + + @classmethod + def _getContextVars(cls) -> dict[str, ContextVar]: + """Build a dictionary of names to caches declared at class scope.""" + classAttributes: dict[str, ContextVar] = {} + for k in vars(cls): + v = getattr(cls, k) + # filter out callables and private attributes + if not callable(v) and not k.startswith("__"): + classAttributes[k] = v + return classAttributes + + def __init__(self) -> None: + self._ctx: Context | None = None + self._tokens: dict[str, Token] | None = None + + def _functionRunner(self, function: Callable[_P, _V], *args: _P.args, **kwargs: _P.kwargs) -> _V: + # create a storage space for the tokens returned from setting the + # context variables + self._tokens = {} + + # Set each cache to an empty dictionary and record the token returned + # by this operation. + for name, attribute in self._getContextVars().items(): + self._tokens[name] = attribute.set({}) + + # Call the supplied function and record the result + result = function(*args, **kwargs) + + # Reset all the context variables back to the state they were in before + # this function was run. + persistenceVars = self._getContextVars() + assert self._tokens is not None + for name, token in self._tokens.items(): + attribute = persistenceVars[name] + attribute.reset(token) + self._tokens = None + return result + + def run(self, function: Callable[_Q, _T], *args: _Q.args, **kwargs: _Q.kwargs) -> _T: + """Execute the supplied function inside context specific caches. + + Parameters + ---------- + function : `Callable` + A callable which is to be executed inside a specific context. + *args : tuple + Positional arguments which are to be passed to the `Callable` + **kwargs: dict, optional + Extra key word arguments which are to be passed to the `Callable` + + Returns + ------- + result : `Any` + The result returned by executing the supplied `Callable` + """ + self._ctx = copy_context() + # Type checkers seem to have trouble with a second layer nesting of + # parameter specs in callables, so ignore the call here and explicitly + # cast the result as we know this is exactly what the return type will + # be. + result = self._ctx.run(self._functionRunner, function, *args, **kwargs) # type: ignore + return cast(_T, result) diff --git a/python/lsst/daf/butler/core/quantum.py b/python/lsst/daf/butler/core/quantum.py index d3cdb77e89..084e59d6e9 100644 --- a/python/lsst/daf/butler/core/quantum.py +++ b/python/lsst/daf/butler/core/quantum.py @@ -23,10 +23,13 @@ __all__ = ("Quantum", "SerializedQuantum", "DimensionRecordsAccumulator") +import sys +import warnings from collections.abc import Iterable, Mapping, MutableMapping from typing import Any from lsst.utils import doImportType +from lsst.utils.introspection import find_outside_stacklevel from pydantic import BaseModel from .datasets import DatasetRef, DatasetType, SerializedDatasetRef, SerializedDatasetType @@ -46,7 +49,6 @@ def _reconstructDatasetRef( type_: DatasetType | None, ids: Iterable[int], dimensionRecords: dict[int, SerializedDimensionRecord] | None, - reconstitutedDimensions: dict[int, tuple[str, DimensionRecord]], universe: DimensionUniverse, ) -> DatasetRef: """Reconstruct a DatasetRef stored in a Serialized Quantum.""" @@ -55,19 +57,11 @@ def _reconstructDatasetRef( for dId in ids: # if the dimension record has been loaded previously use that, # otherwise load it from the dict of Serialized DimensionRecords - if (recId := reconstitutedDimensions.get(dId)) is None: - if dimensionRecords is None: - raise ValueError( - "Cannot construct from a SerializedQuantum with no dimension records. " - "Reconstituted Dimensions must be supplied and populated in method call." - ) - tmpSerialized = dimensionRecords[dId] - reconstructedDim = DimensionRecord.from_simple(tmpSerialized, universe=universe) - definition = tmpSerialized.definition - reconstitutedDimensions[dId] = (definition, reconstructedDim) - else: - definition, reconstructedDim = recId - records[definition] = reconstructedDim + if dimensionRecords is None: + raise ValueError("Cannot construct from a SerializedQuantum with no dimension records. ") + tmpSerialized = dimensionRecords[dId] + reconstructedDim = DimensionRecord.from_simple(tmpSerialized, universe=universe) + records[sys.intern(reconstructedDim.definition.name)] = reconstructedDim # turn the serialized form into an object and attach the dimension records rebuiltDatasetRef = DatasetRef.from_simple(simple, universe, datasetType=type_) if records: @@ -110,13 +104,15 @@ def direct( """ node = SerializedQuantum.__new__(cls) setter = object.__setattr__ - setter(node, "taskName", taskName) + setter(node, "taskName", sys.intern(taskName or "")) setter(node, "dataId", dataId if dataId is None else SerializedDataCoordinate.direct(**dataId)) + setter( node, "datasetTypeMapping", {k: SerializedDatasetType.direct(**v) for k, v in datasetTypeMapping.items()}, ) + setter( node, "initInputs", @@ -207,7 +203,6 @@ class Quantum: "_initInputs", "_inputs", "_outputs", - "_hash", "_datastore_records", ) @@ -236,8 +231,12 @@ def __init__( if outputs is None: outputs = {} self._initInputs = NamedKeyDict[DatasetType, DatasetRef](initInputs).freeze() - self._inputs = NamedKeyDict[DatasetType, list[DatasetRef]](inputs).freeze() - self._outputs = NamedKeyDict[DatasetType, list[DatasetRef]](outputs).freeze() + self._inputs = NamedKeyDict[DatasetType, tuple[DatasetRef]]( + (k, tuple(v)) for k, v in inputs.items() + ).freeze() + self._outputs = NamedKeyDict[DatasetType, tuple[DatasetRef]]( + (k, tuple(v)) for k, v in outputs.items() + ).freeze() if datastore_records is None: datastore_records = {} self._datastore_records = datastore_records @@ -412,23 +411,22 @@ def from_simple( required dimension has already been loaded. Otherwise the record will be unpersisted from the SerializedQuatnum and added to the reconstitutedDimensions dict (if not None). Defaults to None. + Deprecated, any argument will be ignored. """ - loadedTypes: MutableMapping[str, DatasetType] = {} initInputs: MutableMapping[DatasetType, DatasetRef] = {} - if reconstitutedDimensions is None: - reconstitutedDimensions = {} + if reconstitutedDimensions is not None: + warnings.warn( + "The reconstitutedDimensions argument is now ignored and may be removed after v 27", + category=FutureWarning, + stacklevel=find_outside_stacklevel("lsst.daf.butler"), + ) # Unpersist all the init inputs for key, (value, dimensionIds) in simple.initInputs.items(): - # If a datasetType has already been created use that instead of - # unpersisting. - if (type_ := loadedTypes.get(key)) is None: - type_ = loadedTypes.setdefault( - key, DatasetType.from_simple(simple.datasetTypeMapping[key], universe=universe) - ) + type_ = DatasetType.from_simple(simple.datasetTypeMapping[key], universe=universe) # reconstruct the dimension records rebuiltDatasetRef = _reconstructDatasetRef( - value, type_, dimensionIds, simple.dimensionRecords, reconstitutedDimensions, universe + value, type_, dimensionIds, simple.dimensionRecords, universe ) initInputs[type_] = rebuiltDatasetRef @@ -438,17 +436,12 @@ def from_simple( for container, simpleRefs in ((inputs, simple.inputs), (outputs, simple.outputs)): for key, values in simpleRefs.items(): - # If a datasetType has already been created use that instead of - # unpersisting. - if (type_ := loadedTypes.get(key)) is None: - type_ = loadedTypes.setdefault( - key, DatasetType.from_simple(simple.datasetTypeMapping[key], universe=universe) - ) + type_ = DatasetType.from_simple(simple.datasetTypeMapping[key], universe=universe) # reconstruct the list of DatasetRefs for this DatasetType tmp: list[DatasetRef] = [] for v, recIds in values: rebuiltDatasetRef = _reconstructDatasetRef( - v, type_, recIds, simple.dimensionRecords, reconstitutedDimensions, universe + v, type_, recIds, simple.dimensionRecords, universe ) tmp.append(rebuiltDatasetRef) container[type_] = tmp @@ -466,7 +459,7 @@ def from_simple( for datastore_name, record_data in simple.datastoreRecords.items() } - return Quantum( + quant = Quantum( taskName=simple.taskName, dataId=dataId, initInputs=initInputs, @@ -474,6 +467,7 @@ def from_simple( outputs=outputs, datastore_records=datastore_records, ) + return quant @property def taskClass(self) -> type | None: @@ -508,7 +502,7 @@ def initInputs(self) -> NamedKeyMapping[DatasetType, DatasetRef]: return self._initInputs @property - def inputs(self) -> NamedKeyMapping[DatasetType, list[DatasetRef]]: + def inputs(self) -> NamedKeyMapping[DatasetType, tuple[DatasetRef]]: """Return mapping of input datasets that were expected to be used. Has `DatasetType` instances as keys (names can also be used for @@ -523,7 +517,7 @@ def inputs(self) -> NamedKeyMapping[DatasetType, list[DatasetRef]]: return self._inputs @property - def outputs(self) -> NamedKeyMapping[DatasetType, list[DatasetRef]]: + def outputs(self) -> NamedKeyMapping[DatasetType, tuple[DatasetRef]]: """Return mapping of output datasets (to be) generated by this quantum. Has the same form as `predictedInputs`. diff --git a/python/lsst/daf/butler/transfers/_yaml.py b/python/lsst/daf/butler/transfers/_yaml.py index 25ef39e04a..dd9521520b 100644 --- a/python/lsst/daf/butler/transfers/_yaml.py +++ b/python/lsst/daf/butler/transfers/_yaml.py @@ -25,7 +25,7 @@ import uuid import warnings -from collections import defaultdict +from collections import UserDict, defaultdict from collections.abc import Iterable, Mapping from datetime import datetime from typing import IO, TYPE_CHECKING, Any @@ -65,6 +65,22 @@ """ +class _RefMapper(UserDict[int, uuid.UUID]): + """Create a local dict subclass which creates new deterministic UUID for + missing keys. + """ + + _namespace = uuid.UUID("4d4851f4-2890-4d41-8779-5f38a3f5062b") + + def __missing__(self, key: int) -> uuid.UUID: + newUUID = uuid.uuid3(namespace=self._namespace, name=str(key)) + self[key] = newUUID + return newUUID + + +_refIntId2UUID = _RefMapper() + + def _uuid_representer(dumper: yaml.Dumper, data: uuid.UUID) -> yaml.Node: """Generate YAML representation for UUID. @@ -338,16 +354,22 @@ def __init__(self, stream: IO, registry: Registry): elif data["type"] == "associations": collectionType = CollectionType.from_name(data["collection_type"]) if collectionType is CollectionType.TAGGED: - self.tagAssociations[data["collection"]].extend(data["dataset_ids"]) + self.tagAssociations[data["collection"]].extend( + [x if not isinstance(x, int) else _refIntId2UUID[x] for x in data["dataset_ids"]] + ) elif collectionType is CollectionType.CALIBRATION: assocsByTimespan = self.calibAssociations[data["collection"]] for d in data["validity_ranges"]: if "timespan" in d: - assocsByTimespan[d["timespan"]] = d["dataset_ids"] + assocsByTimespan[d["timespan"]] = [ + x if not isinstance(x, int) else _refIntId2UUID[x] for x in d["dataset_ids"] + ] else: # TODO: this is for backward compatibility, should # be removed at some point. - assocsByTimespan[Timespan(begin=d["begin"], end=d["end"])] = d["dataset_ids"] + assocsByTimespan[Timespan(begin=d["begin"], end=d["end"])] = [ + x if not isinstance(x, int) else _refIntId2UUID[x] for x in d["dataset_ids"] + ] else: raise ValueError(f"Unexpected calibration type for association: {collectionType.name}.") else: @@ -362,7 +384,12 @@ def __init__(self, stream: IO, registry: Registry): FileDataset( d.get("path"), [ - DatasetRef(datasetType, dataId, run=data["run"], id=refid) + DatasetRef( + datasetType, + dataId, + run=data["run"], + id=refid if not isinstance(refid, int) else _refIntId2UUID[refid], + ) for dataId, refid in zip( ensure_iterable(d["data_id"]), ensure_iterable(d["dataset_id"]) ) diff --git a/tests/test_templates.py b/tests/test_templates.py index dab247d01f..37122381e4 100644 --- a/tests/test_templates.py +++ b/tests/test_templates.py @@ -23,9 +23,11 @@ import os.path import unittest +import uuid from lsst.daf.butler import ( DataCoordinate, + DatasetId, DatasetRef, DatasetType, DimensionGraph, @@ -41,6 +43,8 @@ PlaceHolder = StorageClass("PlaceHolder") +REFUUID = DatasetId(int=uuid.uuid4().int) + class TestFileTemplates(unittest.TestCase): """Test creation of paths from templates.""" @@ -66,7 +70,7 @@ def makeDatasetRef( StorageClass(storageClassName), parentStorageClass=parentStorageClass, ) - return DatasetRef(datasetType, dataId, id=1, run=run, conform=conform) + return DatasetRef(datasetType, dataId, id=REFUUID, run=run, conform=conform) def setUp(self): self.universe = DimensionUniverse() @@ -104,19 +108,19 @@ def testBasic(self): ) # Check that the id is sufficient without any other information. - self.assertTemplate("{id}", "1", self.makeDatasetRef("calexp", run="run2")) + self.assertTemplate("{id}", str(REFUUID), self.makeDatasetRef("calexp", run="run2")) - self.assertTemplate("{run}/{id}", "run2/1", self.makeDatasetRef("calexp", run="run2")) + self.assertTemplate("{run}/{id}", f"run2/{str(REFUUID)}", self.makeDatasetRef("calexp", run="run2")) self.assertTemplate( "fixed/{id}", - "fixed/1", + f"fixed/{str(REFUUID)}", self.makeDatasetRef("calexp", run="run2"), ) self.assertTemplate( "fixed/{id}_{physical_filter}", - "fixed/1_Most_Amazing_U_Filter_Ever", + f"fixed/{str(REFUUID)}_Most_Amazing_U_Filter_Ever", self.makeDatasetRef("calexp", run="run2"), )