diff --git a/python/lsst/daf/butler/core/__init__.py b/python/lsst/daf/butler/core/__init__.py index 7d803c6e69..0fd255ec18 100644 --- a/python/lsst/daf/butler/core/__init__.py +++ b/python/lsst/daf/butler/core/__init__.py @@ -29,6 +29,7 @@ from .logging import ButlerLogRecords from .mappingFactory import * from .named import * +from .persistenceContext import * from .progress import Progress from .quantum import * from .storageClass import * diff --git a/python/lsst/daf/butler/core/persistenceContext.py b/python/lsst/daf/butler/core/persistenceContext.py new file mode 100644 index 0000000000..f6a4487bee --- /dev/null +++ b/python/lsst/daf/butler/core/persistenceContext.py @@ -0,0 +1,190 @@ +# This file is part of daf_butler. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ("PersistenceContextVars",) + + +import uuid +from collections.abc import Callable +from contextvars import Context, ContextVar, Token, copy_context +from typing import TYPE_CHECKING, TypeVar + +if TYPE_CHECKING: + from .datasets.ref import DatasetRef + from .datasets.type import DatasetType, SerializedDatasetType + from .datastoreRecordData import DatastoreRecordData, SerializedDatastoreRecordData + from .dimensions._coordinate import DataCoordinate, SerializedDataCoordinate + from .dimensions._records import DimensionRecord, SerializedDimensionRecord + +_T = TypeVar("_T") + + +class PersistenceContextVars: + r"""Helper class for deserializing butler data structures. + + When serializing various butler data structures nested dataset types get + serialized independently. This means what were multiple references to the + same object in memory are all duplicated in the serialization process. + + Upon deserialization multiple independent data structures are created to + represent the same logical bit of data. + + This class can be used to remove this duplication by caching objects as + they are created and returning a reference to that object. This is done in + concert with ``direct`` and ``from_simple`` methods on the various butler + dataset structures. + + This class utilizes class level variables as a form of global state. Each + of the various data structures can look to see if these global caches has + been initialized as a cache (a dictionary) or is in the default None state. + + Users of this class are intended to create an instance, and then call the + `run` method, supplying a callable function, and passing any required + arguments. The `run` method then creates a specific execution context, + initializing the caches, and then runs the supplied function. Upon + completion of the function call, the caches are cleared and returned to the + default state. + + This process is thread safe. + + Note + ---- + Caches of `SerializedDatasetRef`\ s are intentionally left out. It was + discovered that these caused excessive python memory allocations which + though cleaned up upon completion, left the process using more memory than + it otherwise needed as python does not return allocated memory to the OS + until process completion. It was determined the runtime cost of recreating + the `SerializedDatasetRef`\ s was worth the memory savings. + """ + serializedDatasetTypeMapping: ContextVar[ + dict[tuple[str, str], SerializedDatasetType] | None + ] = ContextVar("serializedDatasetTypeMapping", default=None) + r"""A cache of `SerializedDatasetType`\ s. + """ + + serializedDataCoordinateMapping: ContextVar[ + dict[tuple[frozenset, bool], SerializedDataCoordinate] | None + ] = ContextVar("serializedDataCoordinateMapping", default=None) + r"""A cache of `SerializedDataCoordinate`\ s. + """ + + serializedDimensionRecordMapping: ContextVar[ + dict[tuple[str, frozenset], SerializedDimensionRecord] | None + ] = ContextVar("serializedDimensionRecordMapping", default=None) + r"""A cache of `SerializedDimensionRecord`\ s. + """ + + serializedDatastoreRecordMapping: ContextVar[ + dict[frozenset[str | uuid.UUID], SerializedDatastoreRecordData] | None + ] = ContextVar("serializedDatastoreRecordMapping", default=None) + r"""A cache of `SerializedDatastoreRecord`\ s. + """ + + loadedTypes: ContextVar[dict[tuple[str, str], DatasetType] | None] = ContextVar( + "loadedTypes", default=None + ) + r"""A cache of `DatasetType`\ s. + """ + + dataCoordinates: ContextVar[dict[tuple[frozenset, bool], DataCoordinate] | None] = ContextVar( + "dataCoordinates", default=None + ) + r"""A cache of `DataCoordinate`\ s. + """ + + datasetRefs: ContextVar[dict[tuple[int, str], DatasetRef] | None] = ContextVar( + "datasetRefs", default=None + ) + r"""A cache of `DatasetRef`\ s. + """ + + dimensionRecords: ContextVar[dict[tuple[str, frozenset], DimensionRecord] | None] = ContextVar( + "dimensionRecords", default=None + ) + r"""A cache of `DimensionRecord`\ s. + """ + + dataStoreRecords: ContextVar[dict[frozenset[str | uuid.UUID], DatastoreRecordData] | None] = ContextVar( + "dataStoreRecords", default=None + ) + r"""A cache of `DatastoreRecordData` objects. + """ + + @classmethod + def _getContextVars(cls) -> dict[str, ContextVar]: + """Build a dictionary of names to caches declared at class scope. + """ + classAttributes: dict[str, ContextVar] = {} + for k in vars(cls): + v = getattr(cls, k) + # filter out callables and private attributes + if not callable(v) and not k.startswith("__"): + classAttributes[k] = v + return classAttributes + + def __init__(self): + self._ctx: Context | None = None + self._tokens: dict[str, Token] | None = None + + def _functionRunner(self, function: Callable[..., _T], *args, **kwargs) -> _T: + # create a storage space for the tokens returned from setting the + # context variables + self._tokens = {} + + # Set each cache to an empty dictionary and record the token returned + # by this operation. + for name, attribute in self._getContextVars().items(): + self._tokens[name] = attribute.set({}) + + # Call the supplied function and record the result + result = function(*args, **kwargs) + + # Reset all the context variables back to the state they were in before + # this function was run. + persistenceVars = self._getContextVars() + assert self._tokens is not None + for name, token in self._tokens.items(): + attribute = persistenceVars[name] + attribute.reset(token) + self._tokens = None + return result + + def run(self, function: Callable[..., _T], *args, **kwargs) -> _T: + """Execute the supplied function inside context specific caches. + + Parameters + ---------- + function : `Callable` + A callable which is to be executed inside a specific context. + *args : tuple + Positional arguments which are to be passed to the `Callable` + **kwargs: dict, optional + Extra key word arguments which are to be passed to the `Callable` + + Returns + ------- + result : `Any` + The result returned by executing the supplied `Callable` + """ + self._ctx = copy_context() + return self._ctx.run(self._functionRunner, function, *args, **kwargs)