Skip to content

Commit

Permalink
Use ResourcePath instead of ButlerURI throughout.
Browse files Browse the repository at this point in the history
Also replaces a few Union[str, ButlerURI] and similar with the new
ResourcePathExpression type alias.
  • Loading branch information
TallJimbo authored and timj committed Jan 11, 2022
1 parent bf0e57e commit 0d8c3c1
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 68 deletions.
12 changes: 6 additions & 6 deletions doc/lsst.pipe.base/creating-a-pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ s, and discussing common conventions when creating `Pipelines`.
A Basic Pipeline
----------------

`Pipeline` documents are written using yaml syntax. If you are unfamiliar with
`Pipeline` documents are written using yaml syntax. If you are unfamiliar with
yaml, there are many guides across the internet, but the basic idea is that it
is a simple markup language to describe key, value mappings, and lists of
values (which may be further mappings).
Expand Down Expand Up @@ -109,12 +109,12 @@ configuration options that alter the way the task executes. Because
description field) some tasks may need specific configurations set to
enable/disable behavior in the context of the specific `Pipeline`.

To configure a task associated with a particular label, the value associated
To configure a task associated with a particular label, the value associated
with the label must be changed from the qualified task name to a new
sub-mapping. This new sub mapping should have two keys, ``class`` and
``config``.

The ``class`` key should point to the same qualified task name as before. The
The ``class`` key should point to the same qualified task name as before. The
value associated with the ``config`` keyword is itself a mapping where
configuration overrides are declared. The example below shows this behavior
in action.
Expand Down Expand Up @@ -371,7 +371,7 @@ extend the total `Pipeline`.

If a ``label`` declared in the the ``tasks`` section was declared in one of
the imported ``Pipelines``, one of two things happen. If the label is
associated with the same `PipelineTask` that was declared in the imported
associated with the same `PipelineTask` that was declared in the imported
pipeline, this definition will be extended. This means that any configs
declared in the imported `Pipeline` will be merged with configs declared in
the current `Pipeline` with the current declaration taking config precedence.
Expand Down Expand Up @@ -421,7 +421,7 @@ is loaded.

The simplest form of a `Pipeline` specification is the URI at which the
`Pipeline` can be found. This URI may be any supported by
`lsst.daf.butler.ButlerURI`. In the case that the pipeline resides in a file
`lsst.resources.ResourcePath`. In the case that the pipeline resides in a file
located on a filesystem accessible by the machine that will be processing the
`Pipeline` (i.e. a file URI), there is no need to preface the URI with
``file://``, a bare file path is assumed to be a file based URI.
Expand Down Expand Up @@ -493,4 +493,4 @@ consistency throughout the software stack.
level of a package.
* Instrument packages should provide `Pipeline`\ s that override standard
`Pipeline`\ s and are specifically configured for that instrument (if
applicable).
applicable).
8 changes: 5 additions & 3 deletions python/lsst/pipe/base/configOverrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from enum import Enum
from operator import attrgetter

from lsst.resources import ResourcePath
from lsst.utils import doImport

OverrideTypes = Enum("OverrideTypes", "Value File Python Instrument")
Expand Down Expand Up @@ -147,10 +148,11 @@ def addFileOverride(self, filename):
Parameters
----------
filename : str
Path to the override file.
filename : convertible to `ResourcePath`
Path or URI to the override file. All URI schemes supported by
`ResourcePath` are supported.
"""
self._overrides.append((OverrideTypes.File, filename))
self._overrides.append((OverrideTypes.File, ResourcePath(filename)))

def addValueOverride(self, field, value):
"""Add override for a specific field.
Expand Down
13 changes: 7 additions & 6 deletions python/lsst/pipe/base/executionButlerBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
from collections import defaultdict
from typing import Callable, DefaultDict, Iterable, List, Mapping, Optional, Set, Tuple, Union

from lsst.daf.butler import Butler, ButlerURI, Config, DataCoordinate, DatasetRef, DatasetType
from lsst.daf.butler import Butler, Config, DataCoordinate, DatasetRef, DatasetType
from lsst.daf.butler.core.repoRelocation import BUTLER_ROOT_TAG
from lsst.daf.butler.transfers import RepoExportContext
from lsst.resources import ResourcePath, ResourcePathExpression
from lsst.utils.introspection import get_class_of

from .graph import QuantumGraph, QuantumNode
Expand Down Expand Up @@ -142,7 +143,7 @@ def _export(
return yamlBuffer


def _setupNewButler(butler: Butler, outputLocation: ButlerURI, dirExists: bool) -> Butler:
def _setupNewButler(butler: Butler, outputLocation: ResourcePath, dirExists: bool) -> Butler:
# Set up the new butler object at the specified location
if dirExists:
# Remove the existing table, if the code got this far and this exists
Expand Down Expand Up @@ -218,7 +219,7 @@ def _import(
def buildExecutionButler(
butler: Butler,
graph: QuantumGraph,
outputLocation: Union[str, ButlerURI],
outputLocation: ResourcePathExpression,
run: str,
*,
clobber: bool = False,
Expand All @@ -242,9 +243,9 @@ def buildExecutionButler(
graph : `QuantumGraph`
Graph containing nodes that are to be exported into an execution
butler
outputLocation : `str` or `~lsst.daf.butler.ButlerURI`
outputLocation : convertible to `ResourcePath
URI Location at which the execution butler is to be exported. May be
specified as a string or a ButlerURI instance.
specified as a string or a `ResourcePath` instance.
run : `str` optional
The run collection that the exported datasets are to be placed in. If
None, the default value in registry.defaults will be used.
Expand Down Expand Up @@ -282,7 +283,7 @@ def buildExecutionButler(
Raised if specified output URI does not correspond to a directory
"""
# We know this must refer to a directory.
outputLocation = ButlerURI(outputLocation, forceDirectory=True)
outputLocation = ResourcePath(outputLocation, forceDirectory=True)

# Do this first to Fail Fast if the output exists
if (dirExists := outputLocation.exists()) and not clobber:
Expand Down
28 changes: 14 additions & 14 deletions python/lsst/pipe/base/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
)

import networkx as nx
from lsst.daf.butler import ButlerURI, DatasetRef, DimensionRecordsAccumulator, DimensionUniverse, Quantum
from lsst.daf.butler import DatasetRef, DimensionRecordsAccumulator, DimensionUniverse, Quantum
from lsst.resources import ResourcePath, ResourcePathExpression
from networkx.drawing.nx_agraph import write_dot

from ..connections import iterConnections
Expand Down Expand Up @@ -717,14 +718,14 @@ def saveUri(self, uri):
Parameters
----------
uri : `ButlerURI` or `str`
uri : convertible to `ResourcePath`
URI to where the graph should be saved.
"""
buffer = self._buildSaveObject()
butlerUri = ButlerURI(uri)
if butlerUri.getExtension() not in (".qgraph"):
path = ResourcePath(uri)
if path.getExtension() not in (".qgraph"):
raise TypeError(f"Can currently only save a graph in qgraph format not {uri}")
butlerUri.write(buffer) # type: ignore # Ignore because bytearray is safe to use in place of bytes
path.write(buffer) # type: ignore # Ignore because bytearray is safe to use in place of bytes

@property
def metadata(self) -> Optional[MappingProxyType[str, Any]]:
Expand All @@ -736,7 +737,7 @@ def metadata(self) -> Optional[MappingProxyType[str, Any]]:
@classmethod
def loadUri(
cls,
uri: Union[ButlerURI, str],
uri: ResourcePathExpression,
universe: DimensionUniverse,
nodes: Optional[Iterable[Union[str, uuid.UUID]]] = None,
graphID: Optional[BuildId] = None,
Expand All @@ -746,7 +747,7 @@ def loadUri(
Parameters
----------
uri : `ButlerURI` or `str`
uri : convertible to `ResourcePath`
URI from where to load the graph.
universe: `~lsst.daf.butler.DimensionUniverse`
DimensionUniverse instance, not used by the method itself but
Expand Down Expand Up @@ -789,8 +790,8 @@ def loadUri(
initialization. To make sure that DimensionUniverse exists this method
accepts dummy DimensionUniverse argument.
"""
uri = ButlerURI(uri)
# With ButlerURI we have the choice of always using a local file
uri = ResourcePath(uri)
# With ResourcePath we have the choice of always using a local file
# or reading in the bytes directly. Reading in bytes can be more
# efficient for reasonably-sized pickle files when the resource
# is remote. For now use the local file variant. For a local file
Expand All @@ -810,16 +811,15 @@ def loadUri(
return qgraph

@classmethod
def readHeader(cls, uri: Union[ButlerURI, str], minimumVersion: int = 3) -> Optional[str]:
def readHeader(cls, uri: ResourcePathExpression, minimumVersion: int = 3) -> Optional[str]:
"""Read the header of a `QuantumGraph` pointed to by the uri parameter
and return it as a string.
Parameters
----------
uri : `~lsst.daf.butler.ButlerURI` or `str`
uri : convertible to `ResourcePath`
The location of the `QuantumGraph` to load. If the argument is a
string, it must correspond to a valid `~lsst.daf.butler.ButlerURI`
path.
string, it must correspond to a valid `ResourcePath` path.
minimumVersion : int
Minimum version of a save file to load. Set to -1 to load all
versions. Older versions may need to be loaded, and re-saved
Expand All @@ -839,7 +839,7 @@ def readHeader(cls, uri: Union[ButlerURI, str], minimumVersion: int = 3) -> Opti
Raised if the extention of the file specified by uri is not a
`QuantumGraph` extention.
"""
uri = ButlerURI(uri)
uri = ResourcePath(uri)
if uri.getExtension() in (".pickle", ".pkl"):
raise ValueError("Reading a header from a pickle save is not supported")
elif uri.getExtension() in (".qgraph"):
Expand Down
53 changes: 32 additions & 21 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@

# -----------------------------
# Imports for other modules --
from lsst.daf.butler import ButlerURI, DatasetType, NamedValueSet, Registry, SkyPixDimension
from lsst.daf.butler import DatasetType, NamedValueSet, Registry, SkyPixDimension
from lsst.resources import ResourcePath, ResourcePathExpression
from lsst.utils import doImport

from . import pipelineIR, pipeTools
Expand Down Expand Up @@ -246,25 +247,27 @@ def fromFile(cls, filename: str) -> Pipeline:
return cls.from_uri(filename)

@classmethod
def from_uri(cls, uri: Union[str, ButlerURI]) -> Pipeline:
def from_uri(cls, uri: ResourcePathExpression) -> Pipeline:
"""Load a pipeline defined in a pipeline yaml file at a location
specified by a URI.
Parameters
----------
uri: `str` or `ButlerURI`
If a string is supplied this should be a URI path that points to a
pipeline defined in yaml format. This uri may also supply
additional labels to be used in subsetting the loaded Pipeline.
These labels are separated from the path by a \\#, and may be
specified as a comma separated list, or a range denoted as
beginning..end. Beginning or end may be empty, in which case the
range will be a half open interval. Unlike python iteration
bounds, end bounds are *INCLUDED*. Note that range based selection
is not well defined for pipelines that are not linear in nature,
and correct behavior is not guaranteed, or may vary from run to
run. The same specifiers can be used with a ButlerURI object, by
being the sole contents in the fragments attribute.
uri: convertible to `ResourcePath`
If a string is supplied this should be a URI path that points to a
pipeline defined in yaml format, either as a direct path to the
yaml file, or as a directory containing a "pipeline.yaml" file (the
form used by `write_to_uri` with ``expand=True``). This uri may
also supply additional labels to be used in subsetting the loaded
Pipeline. These labels are separated from the path by a \\#, and
may be specified as a comma separated list, or a range denoted as
beginning..end. Beginning or end may be empty, in which case the
range will be a half open interval. Unlike python iteration bounds,
end bounds are *INCLUDED*. Note that range based selection is not
well defined for pipelines that are not linear in nature, and
correct behavior is not guaranteed, or may vary from run to run.
The same specifiers can be used with a `ResourcePath` object, by
being the sole contents in the fragments attribute.
Returns
-------
Expand Down Expand Up @@ -356,7 +359,7 @@ def subsetFromLabels(self, labelSpecifier: LabelSpecifier) -> Pipeline:
return Pipeline.fromIR(self._pipelineIR.subset_from_labels(labelSet))

@staticmethod
def _parse_file_specifier(uri: Union[str, ButlerURI]) -> Tuple[ButlerURI, Optional[LabelSpecifier]]:
def _parse_file_specifier(uri: ResourcePathExpression) -> Tuple[ResourcePath, Optional[LabelSpecifier]]:
"""Split appart a uri and any possible label subsets"""
if isinstance(uri, str):
# This is to support legacy pipelines during transition
Expand All @@ -370,7 +373,8 @@ def _parse_file_specifier(uri: Union[str, ButlerURI]) -> Tuple[ButlerURI, Option
)
if uri.count("#") > 1:
raise ValueError("Only one set of labels is allowed when specifying a pipeline to load")
uri = ButlerURI(uri)
# Everything else can be converted directly to ResourcePath.
uri = ResourcePath(uri)
label_subset = uri.fragment or None

specifier: Optional[LabelSpecifier]
Expand Down Expand Up @@ -593,10 +597,17 @@ def _addConfigImpl(self, label: str, newConfig: pipelineIR.ConfigIR) -> None:
def toFile(self, filename: str) -> None:
self._pipelineIR.to_file(filename)

def write_to_uri(self, uri: Union[str, ButlerURI]) -> None:
# tasks need sorted each call because someone might have added or
# removed task, and caching changes does not seem worth the small
# overhead
def write_to_uri(self, uri: ResourcePathExpression) -> None:
"""Write the pipeline to a file or directory.
Parameters
----------
uri : convertible to `ResourcePath`
URI to write to; may have any scheme with `ResourcePath` write
support or no scheme for a local file/directory. Should have a
``.yaml`` extension if ``expand=False`` and a trailing slash
(indicating a directory-like URI) if ``expand=True``.
"""
labels = [td.label for td in self._toExpandedPipelineImpl(checkContracts=False)]
self._pipelineIR.reorder_tasks(labels)
self._pipelineIR.write_to_uri(uri)
Expand Down
30 changes: 14 additions & 16 deletions python/lsst/pipe/base/pipelineIR.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

import yaml
from deprecated.sphinx import deprecated
from lsst.daf.butler import ButlerURI
from lsst.resources import ResourcePath, ResourcePathExpression


class KeepInstrument:
Expand Down Expand Up @@ -859,31 +859,26 @@ def from_file(cls, filename: str) -> PipelineIR:
return cls.from_uri(filename)

@classmethod
def from_uri(cls, uri: Union[str, ButlerURI]) -> PipelineIR:
def from_uri(cls, uri: ResourcePathExpression) -> PipelineIR:
"""Create a `PipelineIR` object from the document specified by the
input uri.
Parameters
----------
uri: `str` or `ButlerURI`
uri: convertible to `ResourcePath`
Location of document to use in creating a `PipelineIR` object.
Returns
-------
pipelineIR : `PipelineIR`
The loaded pipeline
"""
loaded_uri = ButlerURI(uri)
# With ButlerURI we have the choice of always using a local file or
# reading in the bytes directly. Reading in bytes can be more
# efficient for reasonably-sized files when the resource is remote.
# For now use the local file variant. For a local file as_local() does
# nothing.
with loaded_uri.as_local() as local:
loaded_uri = ResourcePath(uri)
with loaded_uri.open("r") as buffer:
# explicitly read here, there was some issue with yaml trying
# to read the ButlerURI itself (I think because it only
# to read the ResourcePath itself (I think because it only
# pretends to be conformant to the io api)
loaded_yaml = yaml.load(local.read(), Loader=PipelineYamlLoader)
loaded_yaml = yaml.load(buffer.read(), Loader=PipelineYamlLoader)
return cls(loaded_yaml)

@deprecated(
Expand All @@ -902,17 +897,20 @@ def to_file(self, filename: str):
"""
self.write_to_uri(filename)

def write_to_uri(self, uri: Union[ButlerURI, str]):
def write_to_uri(
self,
uri: ResourcePathExpression,
):
"""Serialize this `PipelineIR` object into a yaml formatted string and
write the output to a file at the specified uri.
Parameters
----------
uri: `str` or `ButlerURI`
uri: convertible to `ResourcePath`
Location of document to write a `PipelineIR` object.
"""
butlerUri = ButlerURI(uri)
butlerUri.write(yaml.dump(self.to_primitives(), sort_keys=False).encode())
with ResourcePath(uri).open("w") as buffer:
yaml.dump(self.to_primitives(), buffer, sort_keys=False)

def to_primitives(self) -> Dict[str, Any]:
"""Convert to a representation used in yaml serialization"""
Expand Down
5 changes: 3 additions & 2 deletions python/lsst/pipe/base/tests/simpleQGraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
import lsst.pex.config as pexConfig
import numpy
from lsst.base import Packages
from lsst.daf.butler import Butler, ButlerURI, Config, DatasetType
from lsst.daf.butler import Butler, Config, DatasetType
from lsst.daf.butler.core.logging import ButlerLogRecords
from lsst.resources import ResourcePath
from lsst.utils import doImport

from ... import base as pipeBase
Expand Down Expand Up @@ -236,7 +237,7 @@ def makeSimpleButler(root: str, run: str = "test", inMemory: bool = True) -> But
butler : `~lsst.daf.butler.Butler`
Data butler instance.
"""
root = ButlerURI(root, forceDirectory=True)
root = ResourcePath(root, forceDirectory=True)
if not root.isLocal:
raise ValueError(f"Only works with local root not {root}")
config = Config()
Expand Down

0 comments on commit 0d8c3c1

Please sign in to comment.