Skip to content

Commit

Permalink
Optimize memory and load times on deserialization
Browse files Browse the repository at this point in the history
Often may butler primitives are deserialized at the same time, and
it is useful for these objects to share references to each other.
This reduces load time and memory usage.
  • Loading branch information
natelust committed Jun 27, 2023
1 parent 1310356 commit 97137a2
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 50 deletions.
40 changes: 32 additions & 8 deletions python/lsst/daf/butler/core/datasets/ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
]

import enum
import sys
import uuid
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, ClassVar
Expand All @@ -41,6 +42,7 @@
from ..dimensions import DataCoordinate, DimensionGraph, DimensionUniverse, SerializedDataCoordinate
from ..json import from_json_pydantic, to_json_pydantic
from ..named import NamedKeyDict
from ..persistenceContext import PersistenceContextVars
from .type import DatasetType, SerializedDatasetType

if TYPE_CHECKING:
Expand Down Expand Up @@ -142,6 +144,10 @@ def makeDatasetId(
return uuid.uuid5(self.NS_UUID, data)


# This is constant, so don't recreate a set for each instance
_serializedDatasetRefFieldsSet = {"id", "datasetType", "dataId", "run", "component"}


class SerializedDatasetRef(BaseModel):
"""Simplified model of a `DatasetRef` suitable for serialization."""

Expand Down Expand Up @@ -202,9 +208,9 @@ def direct(
datasetType if datasetType is None else SerializedDatasetType.direct(**datasetType),
)
setter(node, "dataId", dataId if dataId is None else SerializedDataCoordinate.direct(**dataId))
setter(node, "run", run)
setter(node, "run", sys.intern(run))
setter(node, "component", component)
setter(node, "__fields_set__", {"id", "datasetType", "dataId", "run", "component"})
setter(node, "__fields_set__", _serializedDatasetRefFieldsSet)
return node


Expand Down Expand Up @@ -254,7 +260,7 @@ class DatasetRef:

_serializedType = SerializedDatasetRef
__slots__ = (
"id",
"_id",
"datasetType",
"dataId",
"run",
Expand All @@ -277,11 +283,15 @@ def __init__(
self.dataId = dataId
self.run = run
if id is not None:
self.id = id
self._id = id.int
else:
self.id = DatasetIdFactory().makeDatasetId(
self._id = DatasetIdFactory().makeDatasetId(
self.run, self.datasetType, self.dataId, id_generation_mode
)
).int

@property
def id(self) -> DatasetId:
return uuid.UUID(int=self._id)

def __eq__(self, other: Any) -> bool:
try:
Expand Down Expand Up @@ -396,9 +406,18 @@ def from_simple(
ref : `DatasetRef`
Newly-constructed object.
"""
cache = PersistenceContextVars.datasetRefs.get()
localName = sys.intern(
datasetType.name
if datasetType is not None
else (x.name if (x := simple.datasetType) is not None else "")
)
key = (simple.id.int, localName)
if cache is not None and (cachedRef := cache.get(key, None)) is not None:
return cachedRef
# Minimalist component will just specify component and id and
# require registry to reconstruct
if set(simple.dict(exclude_unset=True, exclude_defaults=True)).issubset({"id", "component"}):
if not (simple.datasetType is not None or simple.dataId is not None or simple.run is not None):
if registry is None:
raise ValueError("Registry is required to construct component DatasetRef from integer id")
if simple.id is None:
Expand All @@ -408,6 +427,8 @@ def from_simple(
raise RuntimeError(f"No matching dataset found in registry for id {simple.id}")
if simple.component:
ref = ref.makeComponentRef(simple.component)
if cache is not None:
cache[key] = ref
return ref

if universe is None and registry is None:
Expand Down Expand Up @@ -443,7 +464,10 @@ def from_simple(
f"Encountered with {simple!r}{dstr}."
)

return cls(datasetType, dataId, id=simple.id, run=simple.run)
newRef = cls(datasetType, dataId, id=simple.id, run=simple.run)
if cache is not None:
cache[key] = newRef
return newRef

to_json = to_json_pydantic
from_json: ClassVar = classmethod(from_json_pydantic)
Expand Down
19 changes: 18 additions & 1 deletion python/lsst/daf/butler/core/datasets/type.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from ..configSupport import LookupKey
from ..dimensions import DimensionGraph, SerializedDimensionGraph
from ..json import from_json_pydantic, to_json_pydantic
from ..persistenceContext import PersistenceContextVars
from ..storageClass import StorageClass, StorageClassFactory

if TYPE_CHECKING:
Expand Down Expand Up @@ -74,6 +75,10 @@ def direct(
This method should only be called when the inputs are trusted.
"""
cache = PersistenceContextVars.serializedDatasetTypeMapping.get()
key = (name, storageClass or "")
if cache is not None and (type_ := cache.get(key, None)) is not None:
return type_
node = SerializedDatasetType.__new__(cls)
setter = object.__setattr__
setter(node, "name", name)
Expand All @@ -90,6 +95,8 @@ def direct(
"__fields_set__",
{"name", "storageClass", "dimensions", "parentStorageClass", "isCalibration"},
)
if cache is not None:
cache[key] = node
return node


Expand Down Expand Up @@ -685,6 +692,13 @@ def from_simple(
datasetType : `DatasetType`
Newly-constructed object.
"""
# check to see if there is a cache, and if there is, if there is a
# cached dataset type
cache = PersistenceContextVars.loadedTypes.get()
key = (simple.name, simple.storageClass or "")
if cache is not None and (type_ := cache.get(key, None)) is not None:
return type_

if simple.storageClass is None:
# Treat this as minimalist representation
if registry is None:
Expand All @@ -708,14 +722,17 @@ def from_simple(
# mypy hint
raise ValueError(f"Dimensions must be specified in {simple}")

return cls(
newType = cls(
name=simple.name,
dimensions=DimensionGraph.from_simple(simple.dimensions, universe=universe),
storageClass=simple.storageClass,
isCalibration=simple.isCalibration,
parentStorageClass=simple.parentStorageClass,
universe=universe,
)
if cache is not None:
cache[key] = newType
return newType

to_json = to_json_pydantic
from_json: ClassVar = classmethod(from_json_pydantic)
Expand Down
17 changes: 16 additions & 1 deletion python/lsst/daf/butler/core/datastoreRecordData.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

from .datasets import DatasetId
from .dimensions import DimensionUniverse
from .persistenceContext import PersistenceContextVars
from .storedFileInfo import StoredDatastoreItemInfo

if TYPE_CHECKING:
Expand Down Expand Up @@ -70,6 +71,11 @@ def direct(
This method should only be called when the inputs are trusted.
"""
key = frozenset(dataset_ids)
cache = PersistenceContextVars.serializedDatastoreRecordMapping.get()
if cache is not None and (value := cache.get(key)) is not None:
return value

data = SerializedDatastoreRecordData.__new__(cls)
setter = object.__setattr__
# JSON makes strings out of UUIDs, need to convert them back
Expand All @@ -83,6 +89,8 @@ def direct(
if (id := record.get("dataset_id")) is not None:
record["dataset_id"] = uuid.UUID(id) if isinstance(id, str) else id
setter(data, "records", records)
if cache is not None:
cache[key] = data
return data


Expand Down Expand Up @@ -204,6 +212,10 @@ def from_simple(
item_info : `StoredDatastoreItemInfo`
De-serialized instance of `StoredDatastoreItemInfo`.
"""
cache = PersistenceContextVars.dataStoreRecords.get()
key = frozenset(simple.dataset_ids)
if cache is not None and (record := cache.get(key)) is not None:
return record
records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {}
# make sure that all dataset IDs appear in the dict even if they don't
# have records.
Expand All @@ -216,4 +228,7 @@ def from_simple(
info = klass.from_record(record)
dataset_type_records = records.setdefault(info.dataset_id, {})
dataset_type_records.setdefault(table_name, []).append(info)
return cls(records=records)
record = cls(records=records)
if cache is not None:
cache[key] = record
return record
13 changes: 13 additions & 0 deletions python/lsst/daf/butler/core/dimensions/_coordinate.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

from ..json import from_json_pydantic, to_json_pydantic
from ..named import NamedKeyDict, NamedKeyMapping, NamedValueAbstractSet, NameLookupMapping
from ..persistenceContext import PersistenceContextVars
from ..timespan import Timespan
from ._elements import Dimension, DimensionElement
from ._graph import DimensionGraph
Expand Down Expand Up @@ -76,6 +77,10 @@ def direct(cls, *, dataId: dict[str, DataIdValue], records: dict[str, dict]) ->
This method should only be called when the inputs are trusted.
"""
key = (frozenset(dataId.items()), records is not None)
cache = PersistenceContextVars.serializedDataCoordinateMapping.get()
if cache is not None and (result := cache.get(key)) is not None:
return result
node = SerializedDataCoordinate.__new__(cls)
setter = object.__setattr__
setter(node, "dataId", dataId)
Expand All @@ -87,6 +92,8 @@ def direct(cls, *, dataId: dict[str, DataIdValue], records: dict[str, dict]) ->
else {k: SerializedDimensionRecord.direct(**v) for k, v in records.items()},
)
setter(node, "__fields_set__", {"dataId", "records"})
if cache is not None:
cache[key] = node
return node


Expand Down Expand Up @@ -730,6 +737,10 @@ def from_simple(
dataId : `DataCoordinate`
Newly-constructed object.
"""
key = (frozenset(simple.dataId.items()), simple.records is not None)
cache = PersistenceContextVars.dataCoordinates.get()
if cache is not None and (result := cache.get(key)) is not None:
return result
if universe is None and registry is None:
raise ValueError("One of universe or registry is required to convert a dict to a DataCoordinate")
if universe is None and registry is not None:
Expand All @@ -743,6 +754,8 @@ def from_simple(
dataId = dataId.expanded(
{k: DimensionRecord.from_simple(v, universe=universe) for k, v in simple.records.items()}
)
if cache is not None:
cache[key] = dataId
return dataId

to_json = to_json_pydantic
Expand Down
23 changes: 21 additions & 2 deletions python/lsst/daf/butler/core/dimensions/_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from pydantic import BaseModel, Field, StrictBool, StrictFloat, StrictInt, StrictStr, create_model

from ..json import from_json_pydantic, to_json_pydantic
from ..persistenceContext import PersistenceContextVars
from ..timespan import Timespan, TimespanDatabaseRepresentation
from ._elements import Dimension, DimensionElement

Expand Down Expand Up @@ -166,7 +167,13 @@ def direct(
This method should only be called when the inputs are trusted.
"""
node = cls.construct(definition=definition, record=record)
key = (
definition,
frozenset((k, v if not isinstance(v, list) else tuple(v)) for k, v in record.items()),
)
cache = PersistenceContextVars.serializedDimensionRecordMapping.get()
if cache is not None and (result := cache.get(key)) is not None:
return result
node = SerializedDimensionRecord.__new__(cls)
setter = object.__setattr__
setter(node, "definition", definition)
Expand All @@ -177,6 +184,8 @@ def direct(
node, "record", {k: v if type(v) != list else tuple(v) for k, v in record.items()} # type: ignore
)
setter(node, "__fields_set__", {"definition", "record"})
if cache is not None:
cache[key] = node
return node


Expand Down Expand Up @@ -367,6 +376,13 @@ def from_simple(
if universe is None:
# this is for mypy
raise ValueError("Unable to determine a usable universe")
key = (
simple.definition,
frozenset((k, v if not isinstance(v, list) else tuple(v)) for k, v in simple.record.items()),
)
cache = PersistenceContextVars.dimensionRecords.get()
if cache is not None and (result := cache.get(key)) is not None:
return result

definition = DimensionElement.from_simple(simple.definition, universe=universe)

Expand All @@ -389,7 +405,10 @@ def from_simple(
if (hsh := "hash") in rec:
rec[hsh] = bytes.fromhex(rec[hsh].decode())

return _reconstructDimensionRecord(definition, rec)
dimRec = _reconstructDimensionRecord(definition, rec)
if cache is not None:
cache[key] = dimRec
return dimRec

to_json = to_json_pydantic
from_json: ClassVar = classmethod(from_json_pydantic)
Expand Down
Loading

0 comments on commit 97137a2

Please sign in to comment.