Skip to content

Commit

Permalink
Add butler zip-from-graph command
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Oct 30, 2024
1 parent 01d4c0b commit bc4053c
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 2 deletions.
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/cli/cmd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@

__all__ = ["register_instrument", "transfer_from_graph"]

from .commands import register_instrument, transfer_from_graph
from .commands import register_instrument, transfer_from_graph, zip_from_graph
24 changes: 23 additions & 1 deletion python/lsst/pipe/base/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import click
from lsst.daf.butler.cli.opt import (
dataset_type_option,
options_file_option,
register_dataset_types_option,
repo_argument,
Expand Down Expand Up @@ -62,10 +63,31 @@ def register_instrument(*args: Any, **kwargs: Any) -> None:
def transfer_from_graph(**kwargs: Any) -> None:
"""Transfer datasets from a quantum graph to a destination butler.
SOURCE is a URI to the Butler repository containing the RUN dataset.
SOURCE is a URI to the source quantum graph file.
DEST is a URI to the Butler repository that will receive copies of the
datasets.
"""
number = script.transfer_from_graph(**kwargs)
print(f"Number of datasets transferred: {number}")


@click.command(short_help="Make Zip archive from output files using graph.", cls=ButlerCommand)
@click.argument("graph", required=True)
@repo_argument(required=True)
@click.argument("dest", required=True)
@dataset_type_option(help="Dataset types to include in Zip archive.")
@options_file_option()
def zip_from_graph(**kwargs: Any) -> None:
"""Transfer datasets from a quantum graph to a destination butler.
SOURCE is a URI to the source quantum graph file to use when building the
Zip archive.
REPO is a URI to a butler configuration that is used to configure the
datastore of the quantum-backed butler.
DEST is a directory to write the Zip archive.
"""
zip = script.zip_from_graph(**kwargs)
print(f"Zip archive written to {zip}")

Check warning on line 93 in python/lsst/pipe/base/cli/cmd/commands.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/cli/cmd/commands.py#L92-L93

Added lines #L92 - L93 were not covered by tests
1 change: 1 addition & 0 deletions python/lsst/pipe/base/cli/resources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ cmd:
commands:
- register-instrument
- transfer-from-graph
- zip-from-graph
1 change: 1 addition & 0 deletions python/lsst/pipe/base/script/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@

from .register_instrument import register_instrument
from .transfer_from_graph import transfer_from_graph
from .zip_from_graph import zip_from_graph
121 changes: 121 additions & 0 deletions python/lsst/pipe/base/script/zip_from_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# This file is part of pipe_base.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

__all__ = ["zip_from_graph"]

import logging
import re

from lsst.daf.butler import DatasetRef, QuantumBackedButler
from lsst.daf.butler.utils import globToRegex
from lsst.pipe.base import QuantumGraph
from lsst.resources import ResourcePath

_LOG = logging.getLogger(__name__)


def zip_from_graph(
graph: str,
repo: str,
dest: str,
dataset_type: tuple[str, ...],
) -> ResourcePath:
"""Create Zip export file from graph outputs.
Parameters
----------
graph : `str`
URI string of the quantum graph.
repo : `str`
URI to a butler configuration used to define the datastore associated
with the graph.
dest : `str`
Path to the destination directory for the Zip file.
dataset_type : `tuple` of `str`
Dataset type names. An empty tuple implies all dataset types.
Can include globs.
Returns
-------
zip_path : `lsst.resources.ResourcePath`
Path to the Zip file.
"""
# Read whole graph into memory
qgraph = QuantumGraph.loadUri(graph)

Check warning on line 68 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L68

Added line #L68 was not covered by tests

# Collect output refs that could be created by this graph.
original_output_refs: set[DatasetRef] = set(qgraph.globalInitOutputRefs())

Check warning on line 71 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L71

Added line #L71 was not covered by tests
for task_def in qgraph.iterTaskGraph():
if refs := qgraph.initOutputRefs(task_def):
original_output_refs.update(refs)

Check warning on line 74 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L74

Added line #L74 was not covered by tests
for qnode in qgraph:
for otherRefs in qnode.quantum.outputs.values():
original_output_refs.update(otherRefs)

Check warning on line 77 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L77

Added line #L77 was not covered by tests

# Get data repository definitions from the QuantumGraph; these can have
# different storage classes than those in the quanta.
dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()}

Check warning on line 81 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L81

Added line #L81 was not covered by tests

# Convert output_refs to the data repository storage classes, too.
output_refs = set()

Check warning on line 84 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L84

Added line #L84 was not covered by tests
for ref in original_output_refs:
internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType)

Check warning on line 86 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L86

Added line #L86 was not covered by tests
if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name:
output_refs.add(ref.overrideStorageClass(internal_dataset_type.storageClass_name))

Check warning on line 88 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L88

Added line #L88 was not covered by tests
else:
output_refs.add(ref)

Check warning on line 90 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L90

Added line #L90 was not covered by tests

# Make QBB, its config is the same as output Butler.
qbb = QuantumBackedButler.from_predicted(

Check warning on line 93 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L93

Added line #L93 was not covered by tests
config=repo,
predicted_inputs=[ref.id for ref in output_refs],
predicted_outputs=[],
dimensions=qgraph.universe,
datastore_records={},
dataset_types=dataset_types,
)

# Filter the refs based on requested dataset types.
regexes = globToRegex(dataset_type)

Check warning on line 103 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L103

Added line #L103 was not covered by tests
if regexes is ...:
filtered_refs = output_refs

Check warning on line 105 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L105

Added line #L105 was not covered by tests
else:

def _matches(dataset_type_name: str, regexes: list[str | re.Pattern]) -> bool:

Check warning on line 108 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L108

Added line #L108 was not covered by tests
for regex in regexes:
if isinstance(regex, str):
if dataset_type_name == regex:
return True

Check warning on line 112 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L112

Added line #L112 was not covered by tests
elif regex.search(dataset_type_name):
return True
return False

Check warning on line 115 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L114-L115

Added lines #L114 - L115 were not covered by tests

filtered_refs = {ref for ref in output_refs if _matches(ref.datasetType.name, regexes)}

Check warning on line 117 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L117

Added line #L117 was not covered by tests

_LOG.info("Retrieving artifacts for %d datasets and storing in Zip file.", len(output_refs))
zip = qbb.retrieve_artifacts_zip(filtered_refs, dest)
return zip

Check warning on line 121 in python/lsst/pipe/base/script/zip_from_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/script/zip_from_graph.py#L119-L121

Added lines #L119 - L121 were not covered by tests

0 comments on commit bc4053c

Please sign in to comment.