From 1310356d73e029988d5009984175f7fb1442e05b Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Tue, 27 Jun 2023 14:51:20 -0400 Subject: [PATCH 01/11] Add an object for handling deserialization caches --- python/lsst/daf/butler/core/__init__.py | 1 + .../daf/butler/core/persistenceContext.py | 190 ++++++++++++++++++ 2 files changed, 191 insertions(+) create mode 100644 python/lsst/daf/butler/core/persistenceContext.py diff --git a/python/lsst/daf/butler/core/__init__.py b/python/lsst/daf/butler/core/__init__.py index 7d803c6e69..0fd255ec18 100644 --- a/python/lsst/daf/butler/core/__init__.py +++ b/python/lsst/daf/butler/core/__init__.py @@ -29,6 +29,7 @@ from .logging import ButlerLogRecords from .mappingFactory import * from .named import * +from .persistenceContext import * from .progress import Progress from .quantum import * from .storageClass import * diff --git a/python/lsst/daf/butler/core/persistenceContext.py b/python/lsst/daf/butler/core/persistenceContext.py new file mode 100644 index 0000000000..f6a4487bee --- /dev/null +++ b/python/lsst/daf/butler/core/persistenceContext.py @@ -0,0 +1,190 @@ +# This file is part of daf_butler. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ("PersistenceContextVars",) + + +import uuid +from collections.abc import Callable +from contextvars import Context, ContextVar, Token, copy_context +from typing import TYPE_CHECKING, TypeVar + +if TYPE_CHECKING: + from .datasets.ref import DatasetRef + from .datasets.type import DatasetType, SerializedDatasetType + from .datastoreRecordData import DatastoreRecordData, SerializedDatastoreRecordData + from .dimensions._coordinate import DataCoordinate, SerializedDataCoordinate + from .dimensions._records import DimensionRecord, SerializedDimensionRecord + +_T = TypeVar("_T") + + +class PersistenceContextVars: + r"""Helper class for deserializing butler data structures. + + When serializing various butler data structures nested dataset types get + serialized independently. This means what were multiple references to the + same object in memory are all duplicated in the serialization process. + + Upon deserialization multiple independent data structures are created to + represent the same logical bit of data. + + This class can be used to remove this duplication by caching objects as + they are created and returning a reference to that object. This is done in + concert with ``direct`` and ``from_simple`` methods on the various butler + dataset structures. + + This class utilizes class level variables as a form of global state. Each + of the various data structures can look to see if these global caches has + been initialized as a cache (a dictionary) or is in the default None state. + + Users of this class are intended to create an instance, and then call the + `run` method, supplying a callable function, and passing any required + arguments. The `run` method then creates a specific execution context, + initializing the caches, and then runs the supplied function. Upon + completion of the function call, the caches are cleared and returned to the + default state. + + This process is thread safe. + + Note + ---- + Caches of `SerializedDatasetRef`\ s are intentionally left out. It was + discovered that these caused excessive python memory allocations which + though cleaned up upon completion, left the process using more memory than + it otherwise needed as python does not return allocated memory to the OS + until process completion. It was determined the runtime cost of recreating + the `SerializedDatasetRef`\ s was worth the memory savings. + """ + serializedDatasetTypeMapping: ContextVar[ + dict[tuple[str, str], SerializedDatasetType] | None + ] = ContextVar("serializedDatasetTypeMapping", default=None) + r"""A cache of `SerializedDatasetType`\ s. + """ + + serializedDataCoordinateMapping: ContextVar[ + dict[tuple[frozenset, bool], SerializedDataCoordinate] | None + ] = ContextVar("serializedDataCoordinateMapping", default=None) + r"""A cache of `SerializedDataCoordinate`\ s. + """ + + serializedDimensionRecordMapping: ContextVar[ + dict[tuple[str, frozenset], SerializedDimensionRecord] | None + ] = ContextVar("serializedDimensionRecordMapping", default=None) + r"""A cache of `SerializedDimensionRecord`\ s. + """ + + serializedDatastoreRecordMapping: ContextVar[ + dict[frozenset[str | uuid.UUID], SerializedDatastoreRecordData] | None + ] = ContextVar("serializedDatastoreRecordMapping", default=None) + r"""A cache of `SerializedDatastoreRecord`\ s. + """ + + loadedTypes: ContextVar[dict[tuple[str, str], DatasetType] | None] = ContextVar( + "loadedTypes", default=None + ) + r"""A cache of `DatasetType`\ s. + """ + + dataCoordinates: ContextVar[dict[tuple[frozenset, bool], DataCoordinate] | None] = ContextVar( + "dataCoordinates", default=None + ) + r"""A cache of `DataCoordinate`\ s. + """ + + datasetRefs: ContextVar[dict[tuple[int, str], DatasetRef] | None] = ContextVar( + "datasetRefs", default=None + ) + r"""A cache of `DatasetRef`\ s. + """ + + dimensionRecords: ContextVar[dict[tuple[str, frozenset], DimensionRecord] | None] = ContextVar( + "dimensionRecords", default=None + ) + r"""A cache of `DimensionRecord`\ s. + """ + + dataStoreRecords: ContextVar[dict[frozenset[str | uuid.UUID], DatastoreRecordData] | None] = ContextVar( + "dataStoreRecords", default=None + ) + r"""A cache of `DatastoreRecordData` objects. + """ + + @classmethod + def _getContextVars(cls) -> dict[str, ContextVar]: + """Build a dictionary of names to caches declared at class scope. + """ + classAttributes: dict[str, ContextVar] = {} + for k in vars(cls): + v = getattr(cls, k) + # filter out callables and private attributes + if not callable(v) and not k.startswith("__"): + classAttributes[k] = v + return classAttributes + + def __init__(self): + self._ctx: Context | None = None + self._tokens: dict[str, Token] | None = None + + def _functionRunner(self, function: Callable[..., _T], *args, **kwargs) -> _T: + # create a storage space for the tokens returned from setting the + # context variables + self._tokens = {} + + # Set each cache to an empty dictionary and record the token returned + # by this operation. + for name, attribute in self._getContextVars().items(): + self._tokens[name] = attribute.set({}) + + # Call the supplied function and record the result + result = function(*args, **kwargs) + + # Reset all the context variables back to the state they were in before + # this function was run. + persistenceVars = self._getContextVars() + assert self._tokens is not None + for name, token in self._tokens.items(): + attribute = persistenceVars[name] + attribute.reset(token) + self._tokens = None + return result + + def run(self, function: Callable[..., _T], *args, **kwargs) -> _T: + """Execute the supplied function inside context specific caches. + + Parameters + ---------- + function : `Callable` + A callable which is to be executed inside a specific context. + *args : tuple + Positional arguments which are to be passed to the `Callable` + **kwargs: dict, optional + Extra key word arguments which are to be passed to the `Callable` + + Returns + ------- + result : `Any` + The result returned by executing the supplied `Callable` + """ + self._ctx = copy_context() + return self._ctx.run(self._functionRunner, function, *args, **kwargs) From 97137a213f11d4a77f145d6826f15e9cd82db7ce Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Tue, 27 Jun 2023 14:52:38 -0400 Subject: [PATCH 02/11] Optimize memory and load times on deserialization Often may butler primitives are deserialized at the same time, and it is useful for these objects to share references to each other. This reduces load time and memory usage. --- python/lsst/daf/butler/core/datasets/ref.py | 40 ++++++++--- python/lsst/daf/butler/core/datasets/type.py | 19 ++++- .../daf/butler/core/datastoreRecordData.py | 17 ++++- .../daf/butler/core/dimensions/_coordinate.py | 13 ++++ .../daf/butler/core/dimensions/_records.py | 23 +++++- python/lsst/daf/butler/core/quantum.py | 70 +++++++++---------- 6 files changed, 132 insertions(+), 50 deletions(-) diff --git a/python/lsst/daf/butler/core/datasets/ref.py b/python/lsst/daf/butler/core/datasets/ref.py index 005fd5b1ff..4bc0232f25 100644 --- a/python/lsst/daf/butler/core/datasets/ref.py +++ b/python/lsst/daf/butler/core/datasets/ref.py @@ -30,6 +30,7 @@ ] import enum +import sys import uuid from collections.abc import Iterable from typing import TYPE_CHECKING, Any, ClassVar @@ -41,6 +42,7 @@ from ..dimensions import DataCoordinate, DimensionGraph, DimensionUniverse, SerializedDataCoordinate from ..json import from_json_pydantic, to_json_pydantic from ..named import NamedKeyDict +from ..persistenceContext import PersistenceContextVars from .type import DatasetType, SerializedDatasetType if TYPE_CHECKING: @@ -142,6 +144,10 @@ def makeDatasetId( return uuid.uuid5(self.NS_UUID, data) +# This is constant, so don't recreate a set for each instance +_serializedDatasetRefFieldsSet = {"id", "datasetType", "dataId", "run", "component"} + + class SerializedDatasetRef(BaseModel): """Simplified model of a `DatasetRef` suitable for serialization.""" @@ -202,9 +208,9 @@ def direct( datasetType if datasetType is None else SerializedDatasetType.direct(**datasetType), ) setter(node, "dataId", dataId if dataId is None else SerializedDataCoordinate.direct(**dataId)) - setter(node, "run", run) + setter(node, "run", sys.intern(run)) setter(node, "component", component) - setter(node, "__fields_set__", {"id", "datasetType", "dataId", "run", "component"}) + setter(node, "__fields_set__", _serializedDatasetRefFieldsSet) return node @@ -254,7 +260,7 @@ class DatasetRef: _serializedType = SerializedDatasetRef __slots__ = ( - "id", + "_id", "datasetType", "dataId", "run", @@ -277,11 +283,15 @@ def __init__( self.dataId = dataId self.run = run if id is not None: - self.id = id + self._id = id.int else: - self.id = DatasetIdFactory().makeDatasetId( + self._id = DatasetIdFactory().makeDatasetId( self.run, self.datasetType, self.dataId, id_generation_mode - ) + ).int + + @property + def id(self) -> DatasetId: + return uuid.UUID(int=self._id) def __eq__(self, other: Any) -> bool: try: @@ -396,9 +406,18 @@ def from_simple( ref : `DatasetRef` Newly-constructed object. """ + cache = PersistenceContextVars.datasetRefs.get() + localName = sys.intern( + datasetType.name + if datasetType is not None + else (x.name if (x := simple.datasetType) is not None else "") + ) + key = (simple.id.int, localName) + if cache is not None and (cachedRef := cache.get(key, None)) is not None: + return cachedRef # Minimalist component will just specify component and id and # require registry to reconstruct - if set(simple.dict(exclude_unset=True, exclude_defaults=True)).issubset({"id", "component"}): + if not (simple.datasetType is not None or simple.dataId is not None or simple.run is not None): if registry is None: raise ValueError("Registry is required to construct component DatasetRef from integer id") if simple.id is None: @@ -408,6 +427,8 @@ def from_simple( raise RuntimeError(f"No matching dataset found in registry for id {simple.id}") if simple.component: ref = ref.makeComponentRef(simple.component) + if cache is not None: + cache[key] = ref return ref if universe is None and registry is None: @@ -443,7 +464,10 @@ def from_simple( f"Encountered with {simple!r}{dstr}." ) - return cls(datasetType, dataId, id=simple.id, run=simple.run) + newRef = cls(datasetType, dataId, id=simple.id, run=simple.run) + if cache is not None: + cache[key] = newRef + return newRef to_json = to_json_pydantic from_json: ClassVar = classmethod(from_json_pydantic) diff --git a/python/lsst/daf/butler/core/datasets/type.py b/python/lsst/daf/butler/core/datasets/type.py index 1ddbc018c0..72f714d624 100644 --- a/python/lsst/daf/butler/core/datasets/type.py +++ b/python/lsst/daf/butler/core/datasets/type.py @@ -34,6 +34,7 @@ from ..configSupport import LookupKey from ..dimensions import DimensionGraph, SerializedDimensionGraph from ..json import from_json_pydantic, to_json_pydantic +from ..persistenceContext import PersistenceContextVars from ..storageClass import StorageClass, StorageClassFactory if TYPE_CHECKING: @@ -74,6 +75,10 @@ def direct( This method should only be called when the inputs are trusted. """ + cache = PersistenceContextVars.serializedDatasetTypeMapping.get() + key = (name, storageClass or "") + if cache is not None and (type_ := cache.get(key, None)) is not None: + return type_ node = SerializedDatasetType.__new__(cls) setter = object.__setattr__ setter(node, "name", name) @@ -90,6 +95,8 @@ def direct( "__fields_set__", {"name", "storageClass", "dimensions", "parentStorageClass", "isCalibration"}, ) + if cache is not None: + cache[key] = node return node @@ -685,6 +692,13 @@ def from_simple( datasetType : `DatasetType` Newly-constructed object. """ + # check to see if there is a cache, and if there is, if there is a + # cached dataset type + cache = PersistenceContextVars.loadedTypes.get() + key = (simple.name, simple.storageClass or "") + if cache is not None and (type_ := cache.get(key, None)) is not None: + return type_ + if simple.storageClass is None: # Treat this as minimalist representation if registry is None: @@ -708,7 +722,7 @@ def from_simple( # mypy hint raise ValueError(f"Dimensions must be specified in {simple}") - return cls( + newType = cls( name=simple.name, dimensions=DimensionGraph.from_simple(simple.dimensions, universe=universe), storageClass=simple.storageClass, @@ -716,6 +730,9 @@ def from_simple( parentStorageClass=simple.parentStorageClass, universe=universe, ) + if cache is not None: + cache[key] = newType + return newType to_json = to_json_pydantic from_json: ClassVar = classmethod(from_json_pydantic) diff --git a/python/lsst/daf/butler/core/datastoreRecordData.py b/python/lsst/daf/butler/core/datastoreRecordData.py index c6d9f31e0b..58b8da8b42 100644 --- a/python/lsst/daf/butler/core/datastoreRecordData.py +++ b/python/lsst/daf/butler/core/datastoreRecordData.py @@ -36,6 +36,7 @@ from .datasets import DatasetId from .dimensions import DimensionUniverse +from .persistenceContext import PersistenceContextVars from .storedFileInfo import StoredDatastoreItemInfo if TYPE_CHECKING: @@ -70,6 +71,11 @@ def direct( This method should only be called when the inputs are trusted. """ + key = frozenset(dataset_ids) + cache = PersistenceContextVars.serializedDatastoreRecordMapping.get() + if cache is not None and (value := cache.get(key)) is not None: + return value + data = SerializedDatastoreRecordData.__new__(cls) setter = object.__setattr__ # JSON makes strings out of UUIDs, need to convert them back @@ -83,6 +89,8 @@ def direct( if (id := record.get("dataset_id")) is not None: record["dataset_id"] = uuid.UUID(id) if isinstance(id, str) else id setter(data, "records", records) + if cache is not None: + cache[key] = data return data @@ -204,6 +212,10 @@ def from_simple( item_info : `StoredDatastoreItemInfo` De-serialized instance of `StoredDatastoreItemInfo`. """ + cache = PersistenceContextVars.dataStoreRecords.get() + key = frozenset(simple.dataset_ids) + if cache is not None and (record := cache.get(key)) is not None: + return record records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {} # make sure that all dataset IDs appear in the dict even if they don't # have records. @@ -216,4 +228,7 @@ def from_simple( info = klass.from_record(record) dataset_type_records = records.setdefault(info.dataset_id, {}) dataset_type_records.setdefault(table_name, []).append(info) - return cls(records=records) + record = cls(records=records) + if cache is not None: + cache[key] = record + return record diff --git a/python/lsst/daf/butler/core/dimensions/_coordinate.py b/python/lsst/daf/butler/core/dimensions/_coordinate.py index 2c104b1d46..34576f9c3b 100644 --- a/python/lsst/daf/butler/core/dimensions/_coordinate.py +++ b/python/lsst/daf/butler/core/dimensions/_coordinate.py @@ -39,6 +39,7 @@ from ..json import from_json_pydantic, to_json_pydantic from ..named import NamedKeyDict, NamedKeyMapping, NamedValueAbstractSet, NameLookupMapping +from ..persistenceContext import PersistenceContextVars from ..timespan import Timespan from ._elements import Dimension, DimensionElement from ._graph import DimensionGraph @@ -76,6 +77,10 @@ def direct(cls, *, dataId: dict[str, DataIdValue], records: dict[str, dict]) -> This method should only be called when the inputs are trusted. """ + key = (frozenset(dataId.items()), records is not None) + cache = PersistenceContextVars.serializedDataCoordinateMapping.get() + if cache is not None and (result := cache.get(key)) is not None: + return result node = SerializedDataCoordinate.__new__(cls) setter = object.__setattr__ setter(node, "dataId", dataId) @@ -87,6 +92,8 @@ def direct(cls, *, dataId: dict[str, DataIdValue], records: dict[str, dict]) -> else {k: SerializedDimensionRecord.direct(**v) for k, v in records.items()}, ) setter(node, "__fields_set__", {"dataId", "records"}) + if cache is not None: + cache[key] = node return node @@ -730,6 +737,10 @@ def from_simple( dataId : `DataCoordinate` Newly-constructed object. """ + key = (frozenset(simple.dataId.items()), simple.records is not None) + cache = PersistenceContextVars.dataCoordinates.get() + if cache is not None and (result := cache.get(key)) is not None: + return result if universe is None and registry is None: raise ValueError("One of universe or registry is required to convert a dict to a DataCoordinate") if universe is None and registry is not None: @@ -743,6 +754,8 @@ def from_simple( dataId = dataId.expanded( {k: DimensionRecord.from_simple(v, universe=universe) for k, v in simple.records.items()} ) + if cache is not None: + cache[key] = dataId return dataId to_json = to_json_pydantic diff --git a/python/lsst/daf/butler/core/dimensions/_records.py b/python/lsst/daf/butler/core/dimensions/_records.py index 95f7f12bd8..b5a0e6a04f 100644 --- a/python/lsst/daf/butler/core/dimensions/_records.py +++ b/python/lsst/daf/butler/core/dimensions/_records.py @@ -30,6 +30,7 @@ from pydantic import BaseModel, Field, StrictBool, StrictFloat, StrictInt, StrictStr, create_model from ..json import from_json_pydantic, to_json_pydantic +from ..persistenceContext import PersistenceContextVars from ..timespan import Timespan, TimespanDatabaseRepresentation from ._elements import Dimension, DimensionElement @@ -166,7 +167,13 @@ def direct( This method should only be called when the inputs are trusted. """ - node = cls.construct(definition=definition, record=record) + key = ( + definition, + frozenset((k, v if not isinstance(v, list) else tuple(v)) for k, v in record.items()), + ) + cache = PersistenceContextVars.serializedDimensionRecordMapping.get() + if cache is not None and (result := cache.get(key)) is not None: + return result node = SerializedDimensionRecord.__new__(cls) setter = object.__setattr__ setter(node, "definition", definition) @@ -177,6 +184,8 @@ def direct( node, "record", {k: v if type(v) != list else tuple(v) for k, v in record.items()} # type: ignore ) setter(node, "__fields_set__", {"definition", "record"}) + if cache is not None: + cache[key] = node return node @@ -367,6 +376,13 @@ def from_simple( if universe is None: # this is for mypy raise ValueError("Unable to determine a usable universe") + key = ( + simple.definition, + frozenset((k, v if not isinstance(v, list) else tuple(v)) for k, v in simple.record.items()), + ) + cache = PersistenceContextVars.dimensionRecords.get() + if cache is not None and (result := cache.get(key)) is not None: + return result definition = DimensionElement.from_simple(simple.definition, universe=universe) @@ -389,7 +405,10 @@ def from_simple( if (hsh := "hash") in rec: rec[hsh] = bytes.fromhex(rec[hsh].decode()) - return _reconstructDimensionRecord(definition, rec) + dimRec = _reconstructDimensionRecord(definition, rec) + if cache is not None: + cache[key] = dimRec + return dimRec to_json = to_json_pydantic from_json: ClassVar = classmethod(from_json_pydantic) diff --git a/python/lsst/daf/butler/core/quantum.py b/python/lsst/daf/butler/core/quantum.py index d3cdb77e89..58954c63d2 100644 --- a/python/lsst/daf/butler/core/quantum.py +++ b/python/lsst/daf/butler/core/quantum.py @@ -25,6 +25,8 @@ from collections.abc import Iterable, Mapping, MutableMapping from typing import Any +import sys +import warnings from lsst.utils import doImportType from pydantic import BaseModel @@ -46,7 +48,6 @@ def _reconstructDatasetRef( type_: DatasetType | None, ids: Iterable[int], dimensionRecords: dict[int, SerializedDimensionRecord] | None, - reconstitutedDimensions: dict[int, tuple[str, DimensionRecord]], universe: DimensionUniverse, ) -> DatasetRef: """Reconstruct a DatasetRef stored in a Serialized Quantum.""" @@ -55,19 +56,13 @@ def _reconstructDatasetRef( for dId in ids: # if the dimension record has been loaded previously use that, # otherwise load it from the dict of Serialized DimensionRecords - if (recId := reconstitutedDimensions.get(dId)) is None: - if dimensionRecords is None: - raise ValueError( - "Cannot construct from a SerializedQuantum with no dimension records. " - "Reconstituted Dimensions must be supplied and populated in method call." - ) - tmpSerialized = dimensionRecords[dId] - reconstructedDim = DimensionRecord.from_simple(tmpSerialized, universe=universe) - definition = tmpSerialized.definition - reconstitutedDimensions[dId] = (definition, reconstructedDim) - else: - definition, reconstructedDim = recId - records[definition] = reconstructedDim + if dimensionRecords is None: + raise ValueError( + "Cannot construct from a SerializedQuantum with no dimension records. " + ) + tmpSerialized = dimensionRecords[dId] + reconstructedDim = DimensionRecord.from_simple(tmpSerialized, universe=universe) + records[sys.intern(reconstructedDim.definition.name)] = reconstructedDim # turn the serialized form into an object and attach the dimension records rebuiltDatasetRef = DatasetRef.from_simple(simple, universe, datasetType=type_) if records: @@ -110,13 +105,15 @@ def direct( """ node = SerializedQuantum.__new__(cls) setter = object.__setattr__ - setter(node, "taskName", taskName) + setter(node, "taskName", sys.intern(taskName)) setter(node, "dataId", dataId if dataId is None else SerializedDataCoordinate.direct(**dataId)) + setter( node, "datasetTypeMapping", {k: SerializedDatasetType.direct(**v) for k, v in datasetTypeMapping.items()}, ) + setter( node, "initInputs", @@ -207,7 +204,6 @@ class Quantum: "_initInputs", "_inputs", "_outputs", - "_hash", "_datastore_records", ) @@ -236,8 +232,12 @@ def __init__( if outputs is None: outputs = {} self._initInputs = NamedKeyDict[DatasetType, DatasetRef](initInputs).freeze() - self._inputs = NamedKeyDict[DatasetType, list[DatasetRef]](inputs).freeze() - self._outputs = NamedKeyDict[DatasetType, list[DatasetRef]](outputs).freeze() + self._inputs = NamedKeyDict[DatasetType, tuple[DatasetRef]]( + (k, tuple(v)) for k, v in inputs.items() + ).freeze() + self._outputs = NamedKeyDict[DatasetType, tuple[DatasetRef]]( + (k, tuple(v)) for k, v in outputs.items() + ).freeze() if datastore_records is None: datastore_records = {} self._datastore_records = datastore_records @@ -412,23 +412,21 @@ def from_simple( required dimension has already been loaded. Otherwise the record will be unpersisted from the SerializedQuatnum and added to the reconstitutedDimensions dict (if not None). Defaults to None. + Deprecated, any argument will be ignored. """ - loadedTypes: MutableMapping[str, DatasetType] = {} initInputs: MutableMapping[DatasetType, DatasetRef] = {} - if reconstitutedDimensions is None: - reconstitutedDimensions = {} + if reconstitutedDimensions is not None: + warnings.warn( + "The reconstitutedDimensions argument is now ignored and may be removed after v 27", + category=DeprecationWarning, + ) # Unpersist all the init inputs for key, (value, dimensionIds) in simple.initInputs.items(): - # If a datasetType has already been created use that instead of - # unpersisting. - if (type_ := loadedTypes.get(key)) is None: - type_ = loadedTypes.setdefault( - key, DatasetType.from_simple(simple.datasetTypeMapping[key], universe=universe) - ) + type_ = DatasetType.from_simple(simple.datasetTypeMapping[key], universe=universe) # reconstruct the dimension records rebuiltDatasetRef = _reconstructDatasetRef( - value, type_, dimensionIds, simple.dimensionRecords, reconstitutedDimensions, universe + value, type_, dimensionIds, simple.dimensionRecords, universe ) initInputs[type_] = rebuiltDatasetRef @@ -438,17 +436,12 @@ def from_simple( for container, simpleRefs in ((inputs, simple.inputs), (outputs, simple.outputs)): for key, values in simpleRefs.items(): - # If a datasetType has already been created use that instead of - # unpersisting. - if (type_ := loadedTypes.get(key)) is None: - type_ = loadedTypes.setdefault( - key, DatasetType.from_simple(simple.datasetTypeMapping[key], universe=universe) - ) + type_ = DatasetType.from_simple(simple.datasetTypeMapping[key], universe=universe) # reconstruct the list of DatasetRefs for this DatasetType tmp: list[DatasetRef] = [] for v, recIds in values: rebuiltDatasetRef = _reconstructDatasetRef( - v, type_, recIds, simple.dimensionRecords, reconstitutedDimensions, universe + v, type_, recIds, simple.dimensionRecords, universe ) tmp.append(rebuiltDatasetRef) container[type_] = tmp @@ -466,7 +459,7 @@ def from_simple( for datastore_name, record_data in simple.datastoreRecords.items() } - return Quantum( + quant = Quantum( taskName=simple.taskName, dataId=dataId, initInputs=initInputs, @@ -474,6 +467,7 @@ def from_simple( outputs=outputs, datastore_records=datastore_records, ) + return quant @property def taskClass(self) -> type | None: @@ -508,7 +502,7 @@ def initInputs(self) -> NamedKeyMapping[DatasetType, DatasetRef]: return self._initInputs @property - def inputs(self) -> NamedKeyMapping[DatasetType, list[DatasetRef]]: + def inputs(self) -> NamedKeyMapping[DatasetType, tuple[DatasetRef]]: """Return mapping of input datasets that were expected to be used. Has `DatasetType` instances as keys (names can also be used for @@ -523,7 +517,7 @@ def inputs(self) -> NamedKeyMapping[DatasetType, list[DatasetRef]]: return self._inputs @property - def outputs(self) -> NamedKeyMapping[DatasetType, list[DatasetRef]]: + def outputs(self) -> NamedKeyMapping[DatasetType, tuple[DatasetRef]]: """Return mapping of output datasets (to be) generated by this quantum. Has the same form as `predictedInputs`. From 93e3286a36c6cdc942daa3d6707c0abbdeaa783c Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Tue, 27 Jun 2023 14:55:04 -0400 Subject: [PATCH 03/11] Convert integer ids to UUID early Downstream code now depends on refs holding UUIDs. Have the yaml loader convert old style integer ids to UUIDs early rather than waiting for downstream cleanups. --- python/lsst/daf/butler/transfers/_yaml.py | 30 +++++++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/python/lsst/daf/butler/transfers/_yaml.py b/python/lsst/daf/butler/transfers/_yaml.py index 25ef39e04a..e1309ef72f 100644 --- a/python/lsst/daf/butler/transfers/_yaml.py +++ b/python/lsst/daf/butler/transfers/_yaml.py @@ -28,7 +28,7 @@ from collections import defaultdict from collections.abc import Iterable, Mapping from datetime import datetime -from typing import IO, TYPE_CHECKING, Any +from typing import IO, TYPE_CHECKING, Any, cast import astropy.time import yaml @@ -64,6 +64,8 @@ this version of the code. """ +_refIntId2UUID = defaultdict[int, uuid.UUID](uuid.uuid4) + def _uuid_representer(dumper: yaml.Dumper, data: uuid.UUID) -> yaml.Node: """Generate YAML representation for UUID. @@ -338,16 +340,27 @@ def __init__(self, stream: IO, registry: Registry): elif data["type"] == "associations": collectionType = CollectionType.from_name(data["collection_type"]) if collectionType is CollectionType.TAGGED: - self.tagAssociations[data["collection"]].extend(data["dataset_ids"]) + self.tagAssociations[data["collection"]].extend( + [ + x if not isinstance(x, int) else cast(DatasetId, _refIntId2UUID[x]) + for x in data["dataset_ids"] + ] + ) elif collectionType is CollectionType.CALIBRATION: assocsByTimespan = self.calibAssociations[data["collection"]] for d in data["validity_ranges"]: if "timespan" in d: - assocsByTimespan[d["timespan"]] = d["dataset_ids"] + assocsByTimespan[d["timespan"]] = [ + x if not isinstance(x, int) else cast(DatasetId, _refIntId2UUID[x]) + for x in d["dataset_ids"] + ] else: # TODO: this is for backward compatibility, should # be removed at some point. - assocsByTimespan[Timespan(begin=d["begin"], end=d["end"])] = d["dataset_ids"] + assocsByTimespan[Timespan(begin=d["begin"], end=d["end"])] = [ + x if not isinstance(x, int) else cast(DatasetId, _refIntId2UUID[x]) + for x in d["dataset_ids"] + ] else: raise ValueError(f"Unexpected calibration type for association: {collectionType.name}.") else: @@ -362,7 +375,14 @@ def __init__(self, stream: IO, registry: Registry): FileDataset( d.get("path"), [ - DatasetRef(datasetType, dataId, run=data["run"], id=refid) + DatasetRef( + datasetType, + dataId, + run=data["run"], + id=refid + if not isinstance(refid, int) + else cast(DatasetId, _refIntId2UUID[refid]), + ) for dataId, refid in zip( ensure_iterable(d["data_id"]), ensure_iterable(d["dataset_id"]) ) From 498258d98d4e8af3626a951d155367cb79e24b7e Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Tue, 27 Jun 2023 15:04:31 -0400 Subject: [PATCH 04/11] Black and isort changes --- python/lsst/daf/butler/core/datasets/ref.py | 8 +++++--- python/lsst/daf/butler/core/persistenceContext.py | 3 +-- python/lsst/daf/butler/core/quantum.py | 8 +++----- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/python/lsst/daf/butler/core/datasets/ref.py b/python/lsst/daf/butler/core/datasets/ref.py index 4bc0232f25..64796f0c21 100644 --- a/python/lsst/daf/butler/core/datasets/ref.py +++ b/python/lsst/daf/butler/core/datasets/ref.py @@ -285,9 +285,11 @@ def __init__( if id is not None: self._id = id.int else: - self._id = DatasetIdFactory().makeDatasetId( - self.run, self.datasetType, self.dataId, id_generation_mode - ).int + self._id = ( + DatasetIdFactory() + .makeDatasetId(self.run, self.datasetType, self.dataId, id_generation_mode) + .int + ) @property def id(self) -> DatasetId: diff --git a/python/lsst/daf/butler/core/persistenceContext.py b/python/lsst/daf/butler/core/persistenceContext.py index f6a4487bee..3001aae5bc 100644 --- a/python/lsst/daf/butler/core/persistenceContext.py +++ b/python/lsst/daf/butler/core/persistenceContext.py @@ -132,8 +132,7 @@ class PersistenceContextVars: @classmethod def _getContextVars(cls) -> dict[str, ContextVar]: - """Build a dictionary of names to caches declared at class scope. - """ + """Build a dictionary of names to caches declared at class scope.""" classAttributes: dict[str, ContextVar] = {} for k in vars(cls): v = getattr(cls, k) diff --git a/python/lsst/daf/butler/core/quantum.py b/python/lsst/daf/butler/core/quantum.py index 58954c63d2..25357c45f3 100644 --- a/python/lsst/daf/butler/core/quantum.py +++ b/python/lsst/daf/butler/core/quantum.py @@ -23,10 +23,10 @@ __all__ = ("Quantum", "SerializedQuantum", "DimensionRecordsAccumulator") -from collections.abc import Iterable, Mapping, MutableMapping -from typing import Any import sys import warnings +from collections.abc import Iterable, Mapping, MutableMapping +from typing import Any from lsst.utils import doImportType from pydantic import BaseModel @@ -57,9 +57,7 @@ def _reconstructDatasetRef( # if the dimension record has been loaded previously use that, # otherwise load it from the dict of Serialized DimensionRecords if dimensionRecords is None: - raise ValueError( - "Cannot construct from a SerializedQuantum with no dimension records. " - ) + raise ValueError("Cannot construct from a SerializedQuantum with no dimension records. ") tmpSerialized = dimensionRecords[dId] reconstructedDim = DimensionRecord.from_simple(tmpSerialized, universe=universe) records[sys.intern(reconstructedDim.definition.name)] = reconstructedDim From a2805b3f6c84f391630a02299b45c77c08e1d6a4 Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Tue, 27 Jun 2023 15:40:43 -0400 Subject: [PATCH 05/11] Convert test to use UUID instead of int --- tests/test_templates.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tests/test_templates.py b/tests/test_templates.py index dab247d01f..37122381e4 100644 --- a/tests/test_templates.py +++ b/tests/test_templates.py @@ -23,9 +23,11 @@ import os.path import unittest +import uuid from lsst.daf.butler import ( DataCoordinate, + DatasetId, DatasetRef, DatasetType, DimensionGraph, @@ -41,6 +43,8 @@ PlaceHolder = StorageClass("PlaceHolder") +REFUUID = DatasetId(int=uuid.uuid4().int) + class TestFileTemplates(unittest.TestCase): """Test creation of paths from templates.""" @@ -66,7 +70,7 @@ def makeDatasetRef( StorageClass(storageClassName), parentStorageClass=parentStorageClass, ) - return DatasetRef(datasetType, dataId, id=1, run=run, conform=conform) + return DatasetRef(datasetType, dataId, id=REFUUID, run=run, conform=conform) def setUp(self): self.universe = DimensionUniverse() @@ -104,19 +108,19 @@ def testBasic(self): ) # Check that the id is sufficient without any other information. - self.assertTemplate("{id}", "1", self.makeDatasetRef("calexp", run="run2")) + self.assertTemplate("{id}", str(REFUUID), self.makeDatasetRef("calexp", run="run2")) - self.assertTemplate("{run}/{id}", "run2/1", self.makeDatasetRef("calexp", run="run2")) + self.assertTemplate("{run}/{id}", f"run2/{str(REFUUID)}", self.makeDatasetRef("calexp", run="run2")) self.assertTemplate( "fixed/{id}", - "fixed/1", + f"fixed/{str(REFUUID)}", self.makeDatasetRef("calexp", run="run2"), ) self.assertTemplate( "fixed/{id}_{physical_filter}", - "fixed/1_Most_Amazing_U_Filter_Ever", + f"fixed/{str(REFUUID)}_Most_Amazing_U_Filter_Ever", self.makeDatasetRef("calexp", run="run2"), ) From b849bea9d9a41d56c69f927809ea05e9a5bc090a Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Tue, 27 Jun 2023 16:34:35 -0400 Subject: [PATCH 06/11] Add release documentation --- doc/changes/DM-39582.api.md | 1 + doc/changes/DM-39582.misc.md | 2 ++ 2 files changed, 3 insertions(+) create mode 100644 doc/changes/DM-39582.api.md create mode 100644 doc/changes/DM-39582.misc.md diff --git a/doc/changes/DM-39582.api.md b/doc/changes/DM-39582.api.md new file mode 100644 index 0000000000..ea9b5a751a --- /dev/null +++ b/doc/changes/DM-39582.api.md @@ -0,0 +1 @@ +Deprecate reconstituteDimensions argument from Quantum.from_simple diff --git a/doc/changes/DM-39582.misc.md b/doc/changes/DM-39582.misc.md new file mode 100644 index 0000000000..dba103939b --- /dev/null +++ b/doc/changes/DM-39582.misc.md @@ -0,0 +1,2 @@ +Added ability for some butler primitives to be cached and re-used on deserialization through a special +interface. From 253a01d88a1ed5c0a5d8107cb616afd35c02485a Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Wed, 28 Jun 2023 16:28:23 -0400 Subject: [PATCH 07/11] Address formatting/MYPY issues --- python/lsst/daf/butler/core/datasets/ref.py | 10 ++++------ .../daf/butler/core/datastoreRecordData.py | 10 +++++----- .../daf/butler/core/dimensions/_records.py | 10 ++++++++-- .../daf/butler/core/persistenceContext.py | 20 ++++++++++++++----- python/lsst/daf/butler/core/quantum.py | 2 +- python/lsst/daf/butler/transfers/_yaml.py | 17 +++++----------- 6 files changed, 38 insertions(+), 31 deletions(-) diff --git a/python/lsst/daf/butler/core/datasets/ref.py b/python/lsst/daf/butler/core/datasets/ref.py index 64796f0c21..9edac6bccf 100644 --- a/python/lsst/daf/butler/core/datasets/ref.py +++ b/python/lsst/daf/butler/core/datasets/ref.py @@ -293,6 +293,10 @@ def __init__( @property def id(self) -> DatasetId: + """Primary key of the dataset (`DatasetId`). + + Cannot be changed after a `DatasetRef` is constructed. + """ return uuid.UUID(int=self._id) def __eq__(self, other: Any) -> bool: @@ -708,9 +712,3 @@ class associated with the dataset type of the other ref can be Cannot be changed after a `DatasetRef` is constructed. """ - - id: DatasetId - """Primary key of the dataset (`DatasetId`). - - Cannot be changed after a `DatasetRef` is constructed. - """ diff --git a/python/lsst/daf/butler/core/datastoreRecordData.py b/python/lsst/daf/butler/core/datastoreRecordData.py index 58b8da8b42..aad556427a 100644 --- a/python/lsst/daf/butler/core/datastoreRecordData.py +++ b/python/lsst/daf/butler/core/datastoreRecordData.py @@ -214,8 +214,8 @@ def from_simple( """ cache = PersistenceContextVars.dataStoreRecords.get() key = frozenset(simple.dataset_ids) - if cache is not None and (record := cache.get(key)) is not None: - return record + if cache is not None and (cachedRecord := cache.get(key)) is not None: + return cachedRecord records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {} # make sure that all dataset IDs appear in the dict even if they don't # have records. @@ -228,7 +228,7 @@ def from_simple( info = klass.from_record(record) dataset_type_records = records.setdefault(info.dataset_id, {}) dataset_type_records.setdefault(table_name, []).append(info) - record = cls(records=records) + newRecord = cls(records=records) if cache is not None: - cache[key] = record - return record + cache[key] = newRecord + return newRecord diff --git a/python/lsst/daf/butler/core/dimensions/_records.py b/python/lsst/daf/butler/core/dimensions/_records.py index b5a0e6a04f..d35f23bd36 100644 --- a/python/lsst/daf/butler/core/dimensions/_records.py +++ b/python/lsst/daf/butler/core/dimensions/_records.py @@ -167,9 +167,12 @@ def direct( This method should only be called when the inputs are trusted. """ + _recItems = record.items() + # Type ignore because the ternary statement seems to confuse mypy + # based on conflicting inferred types of v. key = ( definition, - frozenset((k, v if not isinstance(v, list) else tuple(v)) for k, v in record.items()), + frozenset((k, v if not isinstance(v, list) else tuple(v)) for k, v in _recItems), # type: ignore ) cache = PersistenceContextVars.serializedDimensionRecordMapping.get() if cache is not None and (result := cache.get(key)) is not None: @@ -376,9 +379,12 @@ def from_simple( if universe is None: # this is for mypy raise ValueError("Unable to determine a usable universe") + _recItems = simple.record.items() + # Type ignore because the ternary statement seems to confuse mypy + # based on conflicting inferred types of v. key = ( simple.definition, - frozenset((k, v if not isinstance(v, list) else tuple(v)) for k, v in simple.record.items()), + frozenset((k, v if not isinstance(v, list) else tuple(v)) for k, v in _recItems), # type: ignore ) cache = PersistenceContextVars.dimensionRecords.get() if cache is not None and (result := cache.get(key)) is not None: diff --git a/python/lsst/daf/butler/core/persistenceContext.py b/python/lsst/daf/butler/core/persistenceContext.py index 3001aae5bc..7d9f616e17 100644 --- a/python/lsst/daf/butler/core/persistenceContext.py +++ b/python/lsst/daf/butler/core/persistenceContext.py @@ -27,7 +27,7 @@ import uuid from collections.abc import Callable from contextvars import Context, ContextVar, Token, copy_context -from typing import TYPE_CHECKING, TypeVar +from typing import TYPE_CHECKING, ParamSpec, TypeVar, cast if TYPE_CHECKING: from .datasets.ref import DatasetRef @@ -37,6 +37,10 @@ from .dimensions._records import DimensionRecord, SerializedDimensionRecord _T = TypeVar("_T") +_V = TypeVar("_V") + +_P = ParamSpec("_P") +_Q = ParamSpec("_Q") class PersistenceContextVars: @@ -76,6 +80,7 @@ class PersistenceContextVars: until process completion. It was determined the runtime cost of recreating the `SerializedDatasetRef`\ s was worth the memory savings. """ + serializedDatasetTypeMapping: ContextVar[ dict[tuple[str, str], SerializedDatasetType] | None ] = ContextVar("serializedDatasetTypeMapping", default=None) @@ -141,11 +146,11 @@ def _getContextVars(cls) -> dict[str, ContextVar]: classAttributes[k] = v return classAttributes - def __init__(self): + def __init__(self) -> None: self._ctx: Context | None = None self._tokens: dict[str, Token] | None = None - def _functionRunner(self, function: Callable[..., _T], *args, **kwargs) -> _T: + def _functionRunner(self, function: Callable[_P, _V], *args: _P.args, **kwargs: _P.kwargs) -> _V: # create a storage space for the tokens returned from setting the # context variables self._tokens = {} @@ -168,7 +173,7 @@ def _functionRunner(self, function: Callable[..., _T], *args, **kwargs) -> _T: self._tokens = None return result - def run(self, function: Callable[..., _T], *args, **kwargs) -> _T: + def run(self, function: Callable[_Q, _T], *args: _Q.args, **kwargs: _Q.kwargs) -> _T: """Execute the supplied function inside context specific caches. Parameters @@ -186,4 +191,9 @@ def run(self, function: Callable[..., _T], *args, **kwargs) -> _T: The result returned by executing the supplied `Callable` """ self._ctx = copy_context() - return self._ctx.run(self._functionRunner, function, *args, **kwargs) + # Type checkers seem to have trouble with a second layer nesting of + # parameter specs in callables, so ignore the call here and explicitly + # cast the result as we know this is exactly what the return type will + # be. + result = self._ctx.run(self._functionRunner, function, *args, **kwargs) # type: ignore + return cast(_T, result) diff --git a/python/lsst/daf/butler/core/quantum.py b/python/lsst/daf/butler/core/quantum.py index 25357c45f3..f3cf45e7f3 100644 --- a/python/lsst/daf/butler/core/quantum.py +++ b/python/lsst/daf/butler/core/quantum.py @@ -103,7 +103,7 @@ def direct( """ node = SerializedQuantum.__new__(cls) setter = object.__setattr__ - setter(node, "taskName", sys.intern(taskName)) + setter(node, "taskName", sys.intern(taskName or "")) setter(node, "dataId", dataId if dataId is None else SerializedDataCoordinate.direct(**dataId)) setter( diff --git a/python/lsst/daf/butler/transfers/_yaml.py b/python/lsst/daf/butler/transfers/_yaml.py index e1309ef72f..ae0cba4f8e 100644 --- a/python/lsst/daf/butler/transfers/_yaml.py +++ b/python/lsst/daf/butler/transfers/_yaml.py @@ -28,7 +28,7 @@ from collections import defaultdict from collections.abc import Iterable, Mapping from datetime import datetime -from typing import IO, TYPE_CHECKING, Any, cast +from typing import IO, TYPE_CHECKING, Any import astropy.time import yaml @@ -341,25 +341,20 @@ def __init__(self, stream: IO, registry: Registry): collectionType = CollectionType.from_name(data["collection_type"]) if collectionType is CollectionType.TAGGED: self.tagAssociations[data["collection"]].extend( - [ - x if not isinstance(x, int) else cast(DatasetId, _refIntId2UUID[x]) - for x in data["dataset_ids"] - ] + [x if not isinstance(x, int) else _refIntId2UUID[x] for x in data["dataset_ids"]] ) elif collectionType is CollectionType.CALIBRATION: assocsByTimespan = self.calibAssociations[data["collection"]] for d in data["validity_ranges"]: if "timespan" in d: assocsByTimespan[d["timespan"]] = [ - x if not isinstance(x, int) else cast(DatasetId, _refIntId2UUID[x]) - for x in d["dataset_ids"] + x if not isinstance(x, int) else _refIntId2UUID[x] for x in d["dataset_ids"] ] else: # TODO: this is for backward compatibility, should # be removed at some point. assocsByTimespan[Timespan(begin=d["begin"], end=d["end"])] = [ - x if not isinstance(x, int) else cast(DatasetId, _refIntId2UUID[x]) - for x in d["dataset_ids"] + x if not isinstance(x, int) else _refIntId2UUID[x] for x in d["dataset_ids"] ] else: raise ValueError(f"Unexpected calibration type for association: {collectionType.name}.") @@ -379,9 +374,7 @@ def __init__(self, stream: IO, registry: Registry): datasetType, dataId, run=data["run"], - id=refid - if not isinstance(refid, int) - else cast(DatasetId, _refIntId2UUID[refid]), + id=refid if not isinstance(refid, int) else _refIntId2UUID[refid], ) for dataId, refid in zip( ensure_iterable(d["data_id"]), ensure_iterable(d["dataset_id"]) From ce5b439c5e1c10ab8bbd68c7dc8452c672a45603 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Thu, 29 Jun 2023 10:33:38 -0700 Subject: [PATCH 08/11] Add some defensive programming to appease mypy --- python/lsst/daf/butler/core/config.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/python/lsst/daf/butler/core/config.py b/python/lsst/daf/butler/core/config.py index 5eb7d9c880..efcc9481f6 100644 --- a/python/lsst/daf/butler/core/config.py +++ b/python/lsst/daf/butler/core/config.py @@ -39,7 +39,7 @@ import yaml from lsst.resources import ResourcePath, ResourcePathExpression -from lsst.utils import doImport +from lsst.utils import doImportType from yaml.representer import Representer yaml.add_representer(defaultdict, Representer.represent_dict) @@ -1203,18 +1203,17 @@ def __init__( if pytype is not None: try: - cls = doImport(pytype) + cls = doImportType(pytype) except ImportError as e: raise RuntimeError(f"Failed to import cls '{pytype}' for config {type(self)}") from e - defaultsFile = cls.defaultConfigFile + # The class referenced from the config file is not required + # to specify a default config file. + defaultsFile = getattr(cls, "defaultConfigFile", None) if defaultsFile is not None: self._updateWithConfigsFromPath(fullSearchPath, defaultsFile) - # Get the container key in case we need it - try: - containerKey = cls.containerKey - except AttributeError: - pass + # Get the container key in case we need it and it is specified. + containerKey = getattr(cls, "containerKey", None) # Now update this object with the external values so that the external # values always override the defaults From fbf00cbf8a74f1be43c67023197ce77b14229201 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Thu, 29 Jun 2023 10:40:19 -0700 Subject: [PATCH 09/11] Check that the datastore record has the right class on read --- python/lsst/daf/butler/core/datastoreRecordData.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/lsst/daf/butler/core/datastoreRecordData.py b/python/lsst/daf/butler/core/datastoreRecordData.py index aad556427a..e3e33ba8c0 100644 --- a/python/lsst/daf/butler/core/datastoreRecordData.py +++ b/python/lsst/daf/butler/core/datastoreRecordData.py @@ -223,6 +223,11 @@ def from_simple( records[dataset_id] = {} for class_name, table_data in simple.records.items(): klass = doImportType(class_name) + if not issubclass(klass, StoredDatastoreItemInfo): + raise RuntimeError( + "The class specified in the SerializedDatastoreRecordData " + f"({get_full_type_name(klass)}) is not a StoredDatastoreItemInfo." + ) for table_name, table_records in table_data.items(): for record in table_records: info = klass.from_record(record) From 5432818de46e08eece5a47baac6ce05f31bf3d38 Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Fri, 30 Jun 2023 11:30:01 -0400 Subject: [PATCH 10/11] Fix mypy with flag comparison change MyPy seems to narrow types somehow when comparing Enum Flags directly with equality operators. Compare by value instead. --- python/lsst/daf/butler/_butler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/lsst/daf/butler/_butler.py b/python/lsst/daf/butler/_butler.py index 9b9fe8d8a3..0005af9375 100644 --- a/python/lsst/daf/butler/_butler.py +++ b/python/lsst/daf/butler/_butler.py @@ -1651,9 +1651,9 @@ def exists( if full_check: if self.datastore.exists(ref): existence |= DatasetExistence._ARTIFACT - elif existence != DatasetExistence.UNRECOGNIZED: + elif existence.value != DatasetExistence.UNRECOGNIZED.value: # Do not add this flag if we have no other idea about a dataset. - existence |= DatasetExistence._ASSUMED + existence |= DatasetExistence(DatasetExistence._ASSUMED) return existence From ef41fb5a9bab2c7eb305e92d12bbe98670691d4e Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Fri, 30 Jun 2023 17:54:16 -0400 Subject: [PATCH 11/11] Address review feedback --- doc/changes/DM-39582.api.md | 1 - doc/changes/DM-39582.removal.md | 1 + .../daf/butler/core/datastoreRecordData.py | 7 ------- .../lsst/daf/butler/core/persistenceContext.py | 8 +------- python/lsst/daf/butler/core/quantum.py | 4 +++- python/lsst/daf/butler/transfers/_yaml.py | 18 ++++++++++++++++-- 6 files changed, 21 insertions(+), 18 deletions(-) delete mode 100644 doc/changes/DM-39582.api.md create mode 100644 doc/changes/DM-39582.removal.md diff --git a/doc/changes/DM-39582.api.md b/doc/changes/DM-39582.api.md deleted file mode 100644 index ea9b5a751a..0000000000 --- a/doc/changes/DM-39582.api.md +++ /dev/null @@ -1 +0,0 @@ -Deprecate reconstituteDimensions argument from Quantum.from_simple diff --git a/doc/changes/DM-39582.removal.md b/doc/changes/DM-39582.removal.md new file mode 100644 index 0000000000..5b6bef359f --- /dev/null +++ b/doc/changes/DM-39582.removal.md @@ -0,0 +1 @@ +Deprecate reconstituteDimensions argument from `Quantum.from_simple`. diff --git a/python/lsst/daf/butler/core/datastoreRecordData.py b/python/lsst/daf/butler/core/datastoreRecordData.py index e3e33ba8c0..5a93078274 100644 --- a/python/lsst/daf/butler/core/datastoreRecordData.py +++ b/python/lsst/daf/butler/core/datastoreRecordData.py @@ -71,11 +71,6 @@ def direct( This method should only be called when the inputs are trusted. """ - key = frozenset(dataset_ids) - cache = PersistenceContextVars.serializedDatastoreRecordMapping.get() - if cache is not None and (value := cache.get(key)) is not None: - return value - data = SerializedDatastoreRecordData.__new__(cls) setter = object.__setattr__ # JSON makes strings out of UUIDs, need to convert them back @@ -89,8 +84,6 @@ def direct( if (id := record.get("dataset_id")) is not None: record["dataset_id"] = uuid.UUID(id) if isinstance(id, str) else id setter(data, "records", records) - if cache is not None: - cache[key] = data return data diff --git a/python/lsst/daf/butler/core/persistenceContext.py b/python/lsst/daf/butler/core/persistenceContext.py index 7d9f616e17..27751388f8 100644 --- a/python/lsst/daf/butler/core/persistenceContext.py +++ b/python/lsst/daf/butler/core/persistenceContext.py @@ -32,7 +32,7 @@ if TYPE_CHECKING: from .datasets.ref import DatasetRef from .datasets.type import DatasetType, SerializedDatasetType - from .datastoreRecordData import DatastoreRecordData, SerializedDatastoreRecordData + from .datastoreRecordData import DatastoreRecordData from .dimensions._coordinate import DataCoordinate, SerializedDataCoordinate from .dimensions._records import DimensionRecord, SerializedDimensionRecord @@ -99,12 +99,6 @@ class PersistenceContextVars: r"""A cache of `SerializedDimensionRecord`\ s. """ - serializedDatastoreRecordMapping: ContextVar[ - dict[frozenset[str | uuid.UUID], SerializedDatastoreRecordData] | None - ] = ContextVar("serializedDatastoreRecordMapping", default=None) - r"""A cache of `SerializedDatastoreRecord`\ s. - """ - loadedTypes: ContextVar[dict[tuple[str, str], DatasetType] | None] = ContextVar( "loadedTypes", default=None ) diff --git a/python/lsst/daf/butler/core/quantum.py b/python/lsst/daf/butler/core/quantum.py index f3cf45e7f3..084e59d6e9 100644 --- a/python/lsst/daf/butler/core/quantum.py +++ b/python/lsst/daf/butler/core/quantum.py @@ -29,6 +29,7 @@ from typing import Any from lsst.utils import doImportType +from lsst.utils.introspection import find_outside_stacklevel from pydantic import BaseModel from .datasets import DatasetRef, DatasetType, SerializedDatasetRef, SerializedDatasetType @@ -416,7 +417,8 @@ def from_simple( if reconstitutedDimensions is not None: warnings.warn( "The reconstitutedDimensions argument is now ignored and may be removed after v 27", - category=DeprecationWarning, + category=FutureWarning, + stacklevel=find_outside_stacklevel("lsst.daf.butler"), ) # Unpersist all the init inputs diff --git a/python/lsst/daf/butler/transfers/_yaml.py b/python/lsst/daf/butler/transfers/_yaml.py index ae0cba4f8e..dd9521520b 100644 --- a/python/lsst/daf/butler/transfers/_yaml.py +++ b/python/lsst/daf/butler/transfers/_yaml.py @@ -25,7 +25,7 @@ import uuid import warnings -from collections import defaultdict +from collections import UserDict, defaultdict from collections.abc import Iterable, Mapping from datetime import datetime from typing import IO, TYPE_CHECKING, Any @@ -64,7 +64,21 @@ this version of the code. """ -_refIntId2UUID = defaultdict[int, uuid.UUID](uuid.uuid4) + +class _RefMapper(UserDict[int, uuid.UUID]): + """Create a local dict subclass which creates new deterministic UUID for + missing keys. + """ + + _namespace = uuid.UUID("4d4851f4-2890-4d41-8779-5f38a3f5062b") + + def __missing__(self, key: int) -> uuid.UUID: + newUUID = uuid.uuid3(namespace=self._namespace, name=str(key)) + self[key] = newUUID + return newUUID + + +_refIntId2UUID = _RefMapper() def _uuid_representer(dumper: yaml.Dumper, data: uuid.UUID) -> yaml.Node: