Skip to content

Commit

Permalink
Add collections.x_query_info and simplify some usage of x_query
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Aug 16, 2024
1 parent 4bbbddd commit d7dcd22
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 52 deletions.
60 changes: 59 additions & 1 deletion python/lsst/daf/butler/_butler_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ def remove_from_chain(
"""
raise NotImplementedError()

@abstractmethod
def x_query(
self,
expression: str | Iterable[str],
Expand All @@ -220,6 +219,8 @@ def x_query(
) -> Sequence[str]:
"""Query the butler for collections matching an expression.
**This is an experimental interface that can change at any time.**
Parameters
----------
expression : `str` or `~collections.abc.Iterable` [ `str` ]
Expand All @@ -240,6 +241,61 @@ def x_query(
collections : `~collections.abc.Sequence` [ `str` ]
The names of collections that match ``expression``.
Notes
-----
The order in which collections are returned is unspecified, except that
the children of a `~CollectionType.CHAINED` collection are guaranteed
to be in the order in which they are searched. When multiple parent
`~CollectionType.CHAINED` collections match the same criteria, the
order in which the two lists appear is unspecified, and the lists of
children may be incomplete if a child has multiple parents.
The default implementation is a wrapper around `x_query_info`.
"""
collections_info = self.x_query_info(
expression,
collection_types=collection_types,
flatten_chains=flatten_chains,
include_chains=include_chains,
)
return [info.name for info in collections_info]

@abstractmethod
def x_query_info(
self,
expression: str | Iterable[str],
collection_types: Set[CollectionType] | CollectionType | None = None,
flatten_chains: bool = False,
include_chains: bool | None = None,
include_parents: bool = False,
) -> Sequence[CollectionInfo]:
"""Query the butler for collections matching an expression and
return detailed information about those collections.
**This is an experimental interface that can change at any time.**
Parameters
----------
expression : `str` or `~collections.abc.Iterable` [ `str` ]
One or more collection names or globs to include in the search.
collection_types : `set` [`CollectionType`], `CollectionType` or `None`
Restrict the types of collections to be searched. If `None` all
collection types are searched.
flatten_chains : `bool`, optional
If `True` (`False` is default), recursively yield the child
collections of matching `~CollectionType.CHAINED` collections.
include_chains : `bool` or `None`, optional
If `True`, yield records for matching `~CollectionType.CHAINED`
collections. Default is the opposite of ``flatten_chains``:
include either CHAINED collections or their children, but not both.
include_parents : `bool`, optional
Whether the returned information includes parents.
Returns
-------
collections : `~collections.abc.Sequence` [ `CollectionInfo` ]
The names of collections that match ``expression``.
Notes
-----
The order in which collections are returned is unspecified, except that
Expand Down Expand Up @@ -302,6 +358,8 @@ def register(self, name: str, type: CollectionType = CollectionType.RUN, doc: st
def x_remove(self, name: str) -> None:
"""Remove the given collection from the registry.
**This is an experimental interface that can change at any time.**
Parameters
----------
name : `str`
Expand Down
23 changes: 23 additions & 0 deletions python/lsst/daf/butler/direct_butler/_direct_butler_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,36 @@ def x_query(
) -> Sequence[str]:
if collection_types is None:
collection_types = CollectionType.all()
# Do not use base implementation for now to avoid the additional
# unused queries.
return self._registry.queryCollections(
expression,
collectionTypes=collection_types,
flattenChains=flatten_chains,
includeChains=include_chains,
)

def x_query_info(
self,
expression: str | Iterable[str],
collection_types: Set[CollectionType] | CollectionType | None = None,
flatten_chains: bool = False,
include_chains: bool | None = None,
include_parents: bool = False,
) -> Sequence[CollectionInfo]:
info = []
with self._registry.caching_context():
if collection_types is None:
collection_types = CollectionType.all()
for name in self._registry.queryCollections(
expression,
collectionTypes=collection_types,
flattenChains=flatten_chains,
includeChains=include_chains,
):
info.append(self.get_info(name, include_parents=include_parents))
return info

def get_info(self, name: str, include_doc: bool = False, include_parents: bool = False) -> CollectionInfo:
record = self._registry.get_collection_record(name)
doc = ""
Expand Down
14 changes: 10 additions & 4 deletions python/lsst/daf/butler/remote_butler/_remote_butler_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,27 @@ def remove_from_chain(
) -> None:
raise NotImplementedError("Not yet available")

def x_query(
def x_query_info(
self,
expression: str | Iterable[str],
collection_types: Set[CollectionType] | CollectionType | None = None,
flatten_chains: bool = False,
include_chains: bool | None = None,
) -> Sequence[str]:
include_parents: bool = False,
) -> Sequence[CollectionInfo]:
# This should become a single call on the server in the future.
if collection_types is None:
collection_types = CollectionType.all()
return self._registry.queryCollections(

info = []
for name in self._registry.queryCollections(
expression,
collectionTypes=collection_types,
flattenChains=flatten_chains,
includeChains=include_chains,
)
):
info.append(self.get_info(name, include_parents=include_parents))
return info

def get_info(self, name: str, include_doc: bool = False, include_parents: bool = False) -> CollectionInfo:
info = self._registry._get_collection_info(
Expand Down
22 changes: 11 additions & 11 deletions python/lsst/daf/butler/script/exportCalibs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from astropy.table import Table

from .._butler import Butler
from .._butler_collections import CollectionInfo
from .._collection_type import CollectionType

if TYPE_CHECKING:
Expand All @@ -44,15 +45,15 @@


def find_calibration_datasets(
butler: Butler, collection: str, datasetTypes: Iterable[DatasetType]
butler: Butler, collection: CollectionInfo, datasetTypes: Iterable[DatasetType]
) -> list[DatasetRef]:
"""Search a calibration collection for calibration datasets.
Parameters
----------
butler : `lsst.daf.butler.Butler`
Butler to use.
collection : `str`
collection : `CollectionInfo`
Collection to search. This should be a CALIBRATION
collection.
datasetTypes : `list` [`lsst.daf.Butler.DatasetType`]
Expand All @@ -68,18 +69,18 @@ def find_calibration_datasets(
RuntimeError
Raised if the collection to search is not a CALIBRATION collection.
"""
if butler.collections.get_info(collection).type != CollectionType.CALIBRATION:
raise RuntimeError(f"Collection {collection} is not a CALIBRATION collection.")
if collection.type != CollectionType.CALIBRATION:
raise RuntimeError(f"Collection {collection.name} is not a CALIBRATION collection.")

exportDatasets = []
for calibType in datasetTypes:
with butler._query() as query:
results = query.datasets(calibType, collections=collection, find_first=False)
results = query.datasets(calibType, collections=collection.name, find_first=False)

try:
refs = list(results.with_dimension_records())
except Exception as e:
e.add_note(f"Error from querying dataset type {calibType} and collection {collection}")
e.add_note(f"Error from querying dataset type {calibType} and collection {collection.name}")
raise
exportDatasets.extend(refs)

Expand Down Expand Up @@ -133,18 +134,17 @@ def exportCalibs(
collectionsToExport = []
datasetsToExport = []

for collection in butler.collections.x_query(
for collection in butler.collections.x_query_info(
collections_query,
flatten_chains=True,
include_chains=True,
collection_types={CollectionType.CALIBRATION, CollectionType.CHAINED},
):
log.info("Checking collection: %s", collection)
log.info("Checking collection: %s", collection.name)

# Get collection information.
collectionsToExport.append(collection)
info = butler.collections.get_info(collection)
if info.type == CollectionType.CALIBRATION:
collectionsToExport.append(collection.name)
if collection.type == CollectionType.CALIBRATION:
exportDatasets = find_calibration_datasets(butler, collection, calibTypes)
datasetsToExport.extend(exportDatasets)

Expand Down
48 changes: 27 additions & 21 deletions python/lsst/daf/butler/script/queryCollections.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from astropy.table import Table

from .._butler import Butler
from .._butler_collections import CollectionInfo
from .._collection_type import CollectionType


Expand Down Expand Up @@ -71,34 +72,36 @@ def _getTable(
dtype=(str, str, str),
)
butler = Butler.from_config(repo)
names = sorted(butler.collections.x_query(glob or "*", collection_types=frozenset(collection_type)))
collections = sorted(
butler.collections.x_query_info(
glob or "*", collection_types=frozenset(collection_type), include_parents=inverse
)
)
if inverse:
for name in names:
info = butler.collections.get_info(name, include_parents=True)
for info in collections:
if info.parents:
first = True
for parentName in sorted(info.parents):
table.add_row((name if first else "", info.type.name if first else "", parentName))
table.add_row((info.name if first else "", info.type.name if first else "", parentName))
first = False
else:
table.add_row((name, info.type.name, ""))
table.add_row((info.name, info.type.name, ""))
# If none of the datasets has a parent dataset then remove the
# description column.
if not any(c for c in table[descriptionCol]):
del table[descriptionCol]
else:
for name in names:
info = butler.collections.get_info(name)
for info in collections:
if info.type == CollectionType.CHAINED:
if info.children:
first = True
for child in info.children:
table.add_row((name if first else "", info.type.name if first else "", child))
table.add_row((info.name if first else "", info.type.name if first else "", child))
first = False
else:
table.add_row((name, info.type.name, ""))
table.add_row((info.name, info.type.name, ""))
else:
table.add_row((name, info.type.name, ""))
table.add_row((info.name, info.type.name, ""))
# If there aren't any CHAINED datasets in the results then remove the
# description column.
if not any(columnVal == CollectionType.CHAINED.name for columnVal in table[typeCol]):
Expand Down Expand Up @@ -142,18 +145,21 @@ def _getTree(
)
butler = Butler.from_config(repo, without_datastore=True)

def addCollection(name: str, level: int = 0) -> None:
info = butler.collections.get_info(name, include_parents=inverse)
table.add_row((" " * level + name, info.type.name))
def addCollection(info: CollectionInfo, level: int = 0) -> None:
table.add_row((" " * level + info.name, info.type.name))
if inverse:
for pname in sorted(info.parents):
addCollection(pname, level + 1)
pinfo = butler.collections.get_info(pname, include_parents=inverse)
addCollection(pinfo, level + 1)
else:
if info.type == CollectionType.CHAINED:
for name in info.children:
addCollection(name, level + 1)
cinfo = butler.collections.get_info(name)
addCollection(cinfo, level + 1)

collections = butler.collections.x_query(glob or "*", collection_types=frozenset(collection_type))
collections = butler.collections.x_query_info(
glob or "*", collection_types=frozenset(collection_type), include_parents=inverse
)
for collection in sorted(collections):
addCollection(collection)
return table
Expand All @@ -165,14 +171,14 @@ def _getFlatten(
collection_type: Iterable[CollectionType],
) -> Table:
butler = Butler.from_config(repo)
collectionNames = list(
butler.collections.x_query(
collections = list(
butler.collections.x_query_info(
glob or "*", collection_types=frozenset(collection_type), flatten_chains=True
)
)

collectionTypes = [butler.collections.get_info(c).type.name for c in collectionNames]
return Table((collectionNames, collectionTypes), names=("Name", "Type"))
names = [c.name for c in collections]
types = [c.type.name for c in collections]
return Table((names, types), names=("Name", "Type"))


def queryCollections(
Expand Down
11 changes: 5 additions & 6 deletions python/lsst/daf/butler/script/removeCollections.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,17 @@ def _getCollectionInfo(
"""
butler = Butler.from_config(repo, without_datastore=True)
try:
names = sorted(butler.collections.x_query(collection, include_chains=True))
collections_info = sorted(butler.collections.x_query_info(collection, include_chains=True))
except MissingCollectionError:
# Hide the error and act like no collections should be removed.
names = []
collections_info = []
collections = Table(names=("Collection", "Collection Type"), dtype=(str, str))
runCollections = Table(names=("Collection",), dtype=(str,))
for name in names:
collection_info = butler.collections.get_info(name)
for collection_info in collections_info:
if collection_info.type == CollectionType.RUN:
runCollections.add_row((name,))
runCollections.add_row((collection_info.name,))
else:
collections.add_row((name, collection_info.type.name))
collections.add_row((collection_info.name, collection_info.type.name))

return CollectionInfo(collections, runCollections)

Expand Down
13 changes: 7 additions & 6 deletions python/lsst/daf/butler/script/removeRuns.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,21 @@ def _getCollectionInfo(
"""
butler = Butler.from_config(repo)
try:
collectionNames = butler.collections.x_query(collection, CollectionType.RUN, include_chains=False)
collections = butler.collections.x_query_info(
collection, CollectionType.RUN, include_chains=False, include_parents=True
)
except MissingCollectionError:
# Act as if no collections matched.
collectionNames = []
collections = []
dataset_types = butler.registry.queryDatasetTypes(...)
runs = []
datasets: dict[str, int] = defaultdict(int)
for collectionName in collectionNames:
collection_info = butler.collections.get_info(collectionName, include_parents=True)
for collection_info in collections:
assert collection_info.type == CollectionType.RUN
runs.append(RemoveRun(collectionName, list(collection_info.parents)))
runs.append(RemoveRun(collection_info.name, list(collection_info.parents)))
with butler._query() as query:
for dt in dataset_types:
results = query.datasets(dt, collections=collectionName)
results = query.datasets(dt, collections=collection_info.name)
count = results.count(exact=False)
if count:
datasets[dt.name] += count
Expand Down
Loading

0 comments on commit d7dcd22

Please sign in to comment.