Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-39582: Add caching for some butler primitives during deserialization #858

Merged
merged 11 commits into from
Jul 4, 2023
1 change: 1 addition & 0 deletions doc/changes/DM-39582.api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Deprecate reconstituteDimensions argument from Quantum.from_simple
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think file name should be removal, not api, according to README.

Add backticks around reconstituteDimensions and Quantum.from_simple and period at the end.

2 changes: 2 additions & 0 deletions doc/changes/DM-39582.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added ability for some butler primitives to be cached and re-used on deserialization through a special
interface.
1 change: 1 addition & 0 deletions python/lsst/daf/butler/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .logging import ButlerLogRecords
from .mappingFactory import *
from .named import *
from .persistenceContext import *
from .progress import Progress
from .quantum import *
from .storageClass import *
Expand Down
42 changes: 34 additions & 8 deletions python/lsst/daf/butler/core/datasets/ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
]

import enum
import sys
import uuid
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, ClassVar
Expand All @@ -41,6 +42,7 @@
from ..dimensions import DataCoordinate, DimensionGraph, DimensionUniverse, SerializedDataCoordinate
from ..json import from_json_pydantic, to_json_pydantic
from ..named import NamedKeyDict
from ..persistenceContext import PersistenceContextVars
from .type import DatasetType, SerializedDatasetType

if TYPE_CHECKING:
Expand Down Expand Up @@ -142,6 +144,10 @@ def makeDatasetId(
return uuid.uuid5(self.NS_UUID, data)


# This is constant, so don't recreate a set for each instance
_serializedDatasetRefFieldsSet = {"id", "datasetType", "dataId", "run", "component"}


class SerializedDatasetRef(BaseModel):
"""Simplified model of a `DatasetRef` suitable for serialization."""

Expand Down Expand Up @@ -202,9 +208,9 @@ def direct(
datasetType if datasetType is None else SerializedDatasetType.direct(**datasetType),
)
setter(node, "dataId", dataId if dataId is None else SerializedDataCoordinate.direct(**dataId))
setter(node, "run", run)
setter(node, "run", sys.intern(run))
setter(node, "component", component)
setter(node, "__fields_set__", {"id", "datasetType", "dataId", "run", "component"})
setter(node, "__fields_set__", _serializedDatasetRefFieldsSet)
return node


Expand Down Expand Up @@ -254,7 +260,7 @@ class DatasetRef:

_serializedType = SerializedDatasetRef
__slots__ = (
"id",
"_id",
"datasetType",
"dataId",
"run",
Expand All @@ -277,12 +283,18 @@ def __init__(
self.dataId = dataId
self.run = run
if id is not None:
self.id = id
self._id = id.int
else:
self.id = DatasetIdFactory().makeDatasetId(
self.run, self.datasetType, self.dataId, id_generation_mode
self._id = (
DatasetIdFactory()
.makeDatasetId(self.run, self.datasetType, self.dataId, id_generation_mode)
.int
)

@property
def id(self) -> DatasetId:
return uuid.UUID(int=self._id)

def __eq__(self, other: Any) -> bool:
try:
return (self.datasetType, self.dataId, self.id) == (other.datasetType, other.dataId, other.id)
Expand Down Expand Up @@ -396,9 +408,18 @@ def from_simple(
ref : `DatasetRef`
Newly-constructed object.
"""
cache = PersistenceContextVars.datasetRefs.get()
localName = sys.intern(
datasetType.name
if datasetType is not None
else (x.name if (x := simple.datasetType) is not None else "")
)
key = (simple.id.int, localName)
natelust marked this conversation as resolved.
Show resolved Hide resolved
if cache is not None and (cachedRef := cache.get(key, None)) is not None:
return cachedRef
# Minimalist component will just specify component and id and
# require registry to reconstruct
if set(simple.dict(exclude_unset=True, exclude_defaults=True)).issubset({"id", "component"}):
if not (simple.datasetType is not None or simple.dataId is not None or simple.run is not None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rewrite this as simple.datasetType is None and simple.dataId is None and simple.run is None, I think it makes it easier to read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not logically the same thing, We only want to run this when they are all False. But and is greedy, so False will always gobble up anything.

if registry is None:
raise ValueError("Registry is required to construct component DatasetRef from integer id")
if simple.id is None:
Expand All @@ -408,6 +429,8 @@ def from_simple(
raise RuntimeError(f"No matching dataset found in registry for id {simple.id}")
if simple.component:
ref = ref.makeComponentRef(simple.component)
if cache is not None:
cache[key] = ref
return ref

if universe is None and registry is None:
Expand Down Expand Up @@ -443,7 +466,10 @@ def from_simple(
f"Encountered with {simple!r}{dstr}."
)

return cls(datasetType, dataId, id=simple.id, run=simple.run)
newRef = cls(datasetType, dataId, id=simple.id, run=simple.run)
if cache is not None:
cache[key] = newRef
return newRef

to_json = to_json_pydantic
from_json: ClassVar = classmethod(from_json_pydantic)
Expand Down
19 changes: 18 additions & 1 deletion python/lsst/daf/butler/core/datasets/type.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from ..configSupport import LookupKey
from ..dimensions import DimensionGraph, SerializedDimensionGraph
from ..json import from_json_pydantic, to_json_pydantic
from ..persistenceContext import PersistenceContextVars
from ..storageClass import StorageClass, StorageClassFactory

if TYPE_CHECKING:
Expand Down Expand Up @@ -74,6 +75,10 @@ def direct(

This method should only be called when the inputs are trusted.
"""
cache = PersistenceContextVars.serializedDatasetTypeMapping.get()
key = (name, storageClass or "")
if cache is not None and (type_ := cache.get(key, None)) is not None:
return type_
node = SerializedDatasetType.__new__(cls)
setter = object.__setattr__
setter(node, "name", name)
Expand All @@ -90,6 +95,8 @@ def direct(
"__fields_set__",
{"name", "storageClass", "dimensions", "parentStorageClass", "isCalibration"},
)
if cache is not None:
cache[key] = node
return node


Expand Down Expand Up @@ -685,6 +692,13 @@ def from_simple(
datasetType : `DatasetType`
Newly-constructed object.
"""
# check to see if there is a cache, and if there is, if there is a
# cached dataset type
cache = PersistenceContextVars.loadedTypes.get()
key = (simple.name, simple.storageClass or "")
if cache is not None and (type_ := cache.get(key, None)) is not None:
return type_

if simple.storageClass is None:
# Treat this as minimalist representation
if registry is None:
Expand All @@ -708,14 +722,17 @@ def from_simple(
# mypy hint
raise ValueError(f"Dimensions must be specified in {simple}")

return cls(
newType = cls(
name=simple.name,
dimensions=DimensionGraph.from_simple(simple.dimensions, universe=universe),
storageClass=simple.storageClass,
isCalibration=simple.isCalibration,
parentStorageClass=simple.parentStorageClass,
universe=universe,
)
if cache is not None:
cache[key] = newType
return newType

to_json = to_json_pydantic
from_json: ClassVar = classmethod(from_json_pydantic)
Expand Down
17 changes: 16 additions & 1 deletion python/lsst/daf/butler/core/datastoreRecordData.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

from .datasets import DatasetId
from .dimensions import DimensionUniverse
from .persistenceContext import PersistenceContextVars
from .storedFileInfo import StoredDatastoreItemInfo

if TYPE_CHECKING:
Expand Down Expand Up @@ -70,6 +71,11 @@ def direct(

This method should only be called when the inputs are trusted.
"""
key = frozenset(dataset_ids)
cache = PersistenceContextVars.serializedDatastoreRecordMapping.get()
if cache is not None and (value := cache.get(key)) is not None:
return value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not expect many (or maybe any) cache hits for this. DatastoreRecordData is per-quantum structure, I do not think any two quanta can have the same set of input datasets?


data = SerializedDatastoreRecordData.__new__(cls)
setter = object.__setattr__
# JSON makes strings out of UUIDs, need to convert them back
Expand All @@ -83,6 +89,8 @@ def direct(
if (id := record.get("dataset_id")) is not None:
record["dataset_id"] = uuid.UUID(id) if isinstance(id, str) else id
setter(data, "records", records)
if cache is not None:
cache[key] = data
return data


Expand Down Expand Up @@ -204,6 +212,10 @@ def from_simple(
item_info : `StoredDatastoreItemInfo`
De-serialized instance of `StoredDatastoreItemInfo`.
"""
cache = PersistenceContextVars.dataStoreRecords.get()
key = frozenset(simple.dataset_ids)
if cache is not None and (record := cache.get(key)) is not None:
return record
records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {}
# make sure that all dataset IDs appear in the dict even if they don't
# have records.
Expand All @@ -216,4 +228,7 @@ def from_simple(
info = klass.from_record(record)
dataset_type_records = records.setdefault(info.dataset_id, {})
dataset_type_records.setdefault(table_name, []).append(info)
return cls(records=records)
record = cls(records=records)
if cache is not None:
cache[key] = record
return record
13 changes: 13 additions & 0 deletions python/lsst/daf/butler/core/dimensions/_coordinate.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

from ..json import from_json_pydantic, to_json_pydantic
from ..named import NamedKeyDict, NamedKeyMapping, NamedValueAbstractSet, NameLookupMapping
from ..persistenceContext import PersistenceContextVars
from ..timespan import Timespan
from ._elements import Dimension, DimensionElement
from ._graph import DimensionGraph
Expand Down Expand Up @@ -76,6 +77,10 @@ def direct(cls, *, dataId: dict[str, DataIdValue], records: dict[str, dict]) ->

This method should only be called when the inputs are trusted.
"""
key = (frozenset(dataId.items()), records is not None)
cache = PersistenceContextVars.serializedDataCoordinateMapping.get()
if cache is not None and (result := cache.get(key)) is not None:
return result
node = SerializedDataCoordinate.__new__(cls)
setter = object.__setattr__
setter(node, "dataId", dataId)
Expand All @@ -87,6 +92,8 @@ def direct(cls, *, dataId: dict[str, DataIdValue], records: dict[str, dict]) ->
else {k: SerializedDimensionRecord.direct(**v) for k, v in records.items()},
)
setter(node, "__fields_set__", {"dataId", "records"})
if cache is not None:
cache[key] = node
return node


Expand Down Expand Up @@ -730,6 +737,10 @@ def from_simple(
dataId : `DataCoordinate`
Newly-constructed object.
"""
key = (frozenset(simple.dataId.items()), simple.records is not None)
cache = PersistenceContextVars.dataCoordinates.get()
if cache is not None and (result := cache.get(key)) is not None:
return result
if universe is None and registry is None:
raise ValueError("One of universe or registry is required to convert a dict to a DataCoordinate")
if universe is None and registry is not None:
Expand All @@ -743,6 +754,8 @@ def from_simple(
dataId = dataId.expanded(
{k: DimensionRecord.from_simple(v, universe=universe) for k, v in simple.records.items()}
)
if cache is not None:
cache[key] = dataId
return dataId

to_json = to_json_pydantic
Expand Down
23 changes: 21 additions & 2 deletions python/lsst/daf/butler/core/dimensions/_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from pydantic import BaseModel, Field, StrictBool, StrictFloat, StrictInt, StrictStr, create_model

from ..json import from_json_pydantic, to_json_pydantic
from ..persistenceContext import PersistenceContextVars
from ..timespan import Timespan, TimespanDatabaseRepresentation
from ._elements import Dimension, DimensionElement

Expand Down Expand Up @@ -166,7 +167,13 @@ def direct(

This method should only be called when the inputs are trusted.
"""
node = cls.construct(definition=definition, record=record)
key = (
definition,
frozenset((k, v if not isinstance(v, list) else tuple(v)) for k, v in record.items()),
)
cache = PersistenceContextVars.serializedDimensionRecordMapping.get()
if cache is not None and (result := cache.get(key)) is not None:
return result
node = SerializedDimensionRecord.__new__(cls)
setter = object.__setattr__
setter(node, "definition", definition)
Expand All @@ -177,6 +184,8 @@ def direct(
node, "record", {k: v if type(v) != list else tuple(v) for k, v in record.items()} # type: ignore
)
setter(node, "__fields_set__", {"definition", "record"})
if cache is not None:
cache[key] = node
return node


Expand Down Expand Up @@ -367,6 +376,13 @@ def from_simple(
if universe is None:
# this is for mypy
raise ValueError("Unable to determine a usable universe")
key = (
simple.definition,
frozenset((k, v if not isinstance(v, list) else tuple(v)) for k, v in simple.record.items()),
)
cache = PersistenceContextVars.dimensionRecords.get()
if cache is not None and (result := cache.get(key)) is not None:
return result

definition = DimensionElement.from_simple(simple.definition, universe=universe)

Expand All @@ -389,7 +405,10 @@ def from_simple(
if (hsh := "hash") in rec:
rec[hsh] = bytes.fromhex(rec[hsh].decode())

return _reconstructDimensionRecord(definition, rec)
dimRec = _reconstructDimensionRecord(definition, rec)
if cache is not None:
cache[key] = dimRec
return dimRec

to_json = to_json_pydantic
from_json: ClassVar = classmethod(from_json_pydantic)
Expand Down
Loading