diff --git a/doc/lsst.pipe.base/creating-a-pipeline.rst b/doc/lsst.pipe.base/creating-a-pipeline.rst index de99b9c98..8c9bbae94 100644 --- a/doc/lsst.pipe.base/creating-a-pipeline.rst +++ b/doc/lsst.pipe.base/creating-a-pipeline.rst @@ -27,7 +27,7 @@ s, and discussing common conventions when creating `Pipelines`. A Basic Pipeline ---------------- -`Pipeline` documents are written using yaml syntax. If you are unfamiliar with +`Pipeline` documents are written using yaml syntax. If you are unfamiliar with yaml, there are many guides across the internet, but the basic idea is that it is a simple markup language to describe key, value mappings, and lists of values (which may be further mappings). @@ -109,12 +109,12 @@ configuration options that alter the way the task executes. Because description field) some tasks may need specific configurations set to enable/disable behavior in the context of the specific `Pipeline`. -To configure a task associated with a particular label, the value associated +To configure a task associated with a particular label, the value associated with the label must be changed from the qualified task name to a new sub-mapping. This new sub mapping should have two keys, ``class`` and ``config``. -The ``class`` key should point to the same qualified task name as before. The +The ``class`` key should point to the same qualified task name as before. The value associated with the ``config`` keyword is itself a mapping where configuration overrides are declared. The example below shows this behavior in action. @@ -371,7 +371,7 @@ extend the total `Pipeline`. If a ``label`` declared in the the ``tasks`` section was declared in one of the imported ``Pipelines``, one of two things happen. If the label is -associated with the same `PipelineTask` that was declared in the imported +associated with the same `PipelineTask` that was declared in the imported pipeline, this definition will be extended. This means that any configs declared in the imported `Pipeline` will be merged with configs declared in the current `Pipeline` with the current declaration taking config precedence. @@ -421,7 +421,7 @@ is loaded. The simplest form of a `Pipeline` specification is the URI at which the `Pipeline` can be found. This URI may be any supported by -`lsst.daf.butler.ButlerURI`. In the case that the pipeline resides in a file +`lsst.resources.ResourcePath`. In the case that the pipeline resides in a file located on a filesystem accessible by the machine that will be processing the `Pipeline` (i.e. a file URI), there is no need to preface the URI with ``file://``, a bare file path is assumed to be a file based URI. @@ -493,4 +493,4 @@ consistency throughout the software stack. level of a package. * Instrument packages should provide `Pipeline`\ s that override standard `Pipeline`\ s and are specifically configured for that instrument (if - applicable). \ No newline at end of file + applicable). diff --git a/python/lsst/pipe/base/configOverrides.py b/python/lsst/pipe/base/configOverrides.py index 671c3773d..a4a88e77f 100644 --- a/python/lsst/pipe/base/configOverrides.py +++ b/python/lsst/pipe/base/configOverrides.py @@ -29,6 +29,7 @@ from enum import Enum from operator import attrgetter +from lsst.resources import ResourcePath from lsst.utils import doImport OverrideTypes = Enum("OverrideTypes", "Value File Python Instrument") @@ -147,10 +148,11 @@ def addFileOverride(self, filename): Parameters ---------- - filename : str - Path to the override file. + filename : convertible to `ResourcePath` + Path or URI to the override file. All URI schemes supported by + `ResourcePath` are supported. """ - self._overrides.append((OverrideTypes.File, filename)) + self._overrides.append((OverrideTypes.File, ResourcePath(filename))) def addValueOverride(self, field, value): """Add override for a specific field. diff --git a/python/lsst/pipe/base/executionButlerBuilder.py b/python/lsst/pipe/base/executionButlerBuilder.py index 5e1db7fae..22909dfe1 100644 --- a/python/lsst/pipe/base/executionButlerBuilder.py +++ b/python/lsst/pipe/base/executionButlerBuilder.py @@ -27,9 +27,10 @@ from collections import defaultdict from typing import Callable, DefaultDict, Iterable, List, Mapping, Optional, Set, Tuple, Union -from lsst.daf.butler import Butler, ButlerURI, Config, DataCoordinate, DatasetRef, DatasetType +from lsst.daf.butler import Butler, Config, DataCoordinate, DatasetRef, DatasetType from lsst.daf.butler.core.repoRelocation import BUTLER_ROOT_TAG from lsst.daf.butler.transfers import RepoExportContext +from lsst.resources import ResourcePath, ResourcePathExpression from lsst.utils.introspection import get_class_of from .graph import QuantumGraph, QuantumNode @@ -142,7 +143,7 @@ def _export( return yamlBuffer -def _setupNewButler(butler: Butler, outputLocation: ButlerURI, dirExists: bool) -> Butler: +def _setupNewButler(butler: Butler, outputLocation: ResourcePath, dirExists: bool) -> Butler: # Set up the new butler object at the specified location if dirExists: # Remove the existing table, if the code got this far and this exists @@ -218,7 +219,7 @@ def _import( def buildExecutionButler( butler: Butler, graph: QuantumGraph, - outputLocation: Union[str, ButlerURI], + outputLocation: ResourcePathExpression, run: str, *, clobber: bool = False, @@ -242,9 +243,9 @@ def buildExecutionButler( graph : `QuantumGraph` Graph containing nodes that are to be exported into an execution butler - outputLocation : `str` or `~lsst.daf.butler.ButlerURI` + outputLocation : convertible to `ResourcePath URI Location at which the execution butler is to be exported. May be - specified as a string or a ButlerURI instance. + specified as a string or a `ResourcePath` instance. run : `str` optional The run collection that the exported datasets are to be placed in. If None, the default value in registry.defaults will be used. @@ -282,7 +283,7 @@ def buildExecutionButler( Raised if specified output URI does not correspond to a directory """ # We know this must refer to a directory. - outputLocation = ButlerURI(outputLocation, forceDirectory=True) + outputLocation = ResourcePath(outputLocation, forceDirectory=True) # Do this first to Fail Fast if the output exists if (dirExists := outputLocation.exists()) and not clobber: diff --git a/python/lsst/pipe/base/graph/graph.py b/python/lsst/pipe/base/graph/graph.py index f54c79ddf..4e4bdd665 100644 --- a/python/lsst/pipe/base/graph/graph.py +++ b/python/lsst/pipe/base/graph/graph.py @@ -54,7 +54,8 @@ ) import networkx as nx -from lsst.daf.butler import ButlerURI, DatasetRef, DimensionRecordsAccumulator, DimensionUniverse, Quantum +from lsst.daf.butler import DatasetRef, DimensionRecordsAccumulator, DimensionUniverse, Quantum +from lsst.resources import ResourcePath, ResourcePathExpression from networkx.drawing.nx_agraph import write_dot from ..connections import iterConnections @@ -717,14 +718,14 @@ def saveUri(self, uri): Parameters ---------- - uri : `ButlerURI` or `str` + uri : convertible to `ResourcePath` URI to where the graph should be saved. """ buffer = self._buildSaveObject() - butlerUri = ButlerURI(uri) - if butlerUri.getExtension() not in (".qgraph"): + path = ResourcePath(uri) + if path.getExtension() not in (".qgraph"): raise TypeError(f"Can currently only save a graph in qgraph format not {uri}") - butlerUri.write(buffer) # type: ignore # Ignore because bytearray is safe to use in place of bytes + path.write(buffer) # type: ignore # Ignore because bytearray is safe to use in place of bytes @property def metadata(self) -> Optional[MappingProxyType[str, Any]]: @@ -736,7 +737,7 @@ def metadata(self) -> Optional[MappingProxyType[str, Any]]: @classmethod def loadUri( cls, - uri: Union[ButlerURI, str], + uri: ResourcePathExpression, universe: DimensionUniverse, nodes: Optional[Iterable[Union[str, uuid.UUID]]] = None, graphID: Optional[BuildId] = None, @@ -746,7 +747,7 @@ def loadUri( Parameters ---------- - uri : `ButlerURI` or `str` + uri : convertible to `ResourcePath` URI from where to load the graph. universe: `~lsst.daf.butler.DimensionUniverse` DimensionUniverse instance, not used by the method itself but @@ -789,8 +790,8 @@ def loadUri( initialization. To make sure that DimensionUniverse exists this method accepts dummy DimensionUniverse argument. """ - uri = ButlerURI(uri) - # With ButlerURI we have the choice of always using a local file + uri = ResourcePath(uri) + # With ResourcePath we have the choice of always using a local file # or reading in the bytes directly. Reading in bytes can be more # efficient for reasonably-sized pickle files when the resource # is remote. For now use the local file variant. For a local file @@ -810,16 +811,15 @@ def loadUri( return qgraph @classmethod - def readHeader(cls, uri: Union[ButlerURI, str], minimumVersion: int = 3) -> Optional[str]: + def readHeader(cls, uri: ResourcePathExpression, minimumVersion: int = 3) -> Optional[str]: """Read the header of a `QuantumGraph` pointed to by the uri parameter and return it as a string. Parameters ---------- - uri : `~lsst.daf.butler.ButlerURI` or `str` + uri : convertible to `ResourcePath` The location of the `QuantumGraph` to load. If the argument is a - string, it must correspond to a valid `~lsst.daf.butler.ButlerURI` - path. + string, it must correspond to a valid `ResourcePath` path. minimumVersion : int Minimum version of a save file to load. Set to -1 to load all versions. Older versions may need to be loaded, and re-saved @@ -839,7 +839,7 @@ def readHeader(cls, uri: Union[ButlerURI, str], minimumVersion: int = 3) -> Opti Raised if the extention of the file specified by uri is not a `QuantumGraph` extention. """ - uri = ButlerURI(uri) + uri = ResourcePath(uri) if uri.getExtension() in (".pickle", ".pkl"): raise ValueError("Reading a header from a pickle save is not supported") elif uri.getExtension() in (".qgraph"): diff --git a/python/lsst/pipe/base/pipeline.py b/python/lsst/pipe/base/pipeline.py index 370317de6..9d04f6386 100644 --- a/python/lsst/pipe/base/pipeline.py +++ b/python/lsst/pipe/base/pipeline.py @@ -53,7 +53,8 @@ # ----------------------------- # Imports for other modules -- -from lsst.daf.butler import ButlerURI, DatasetType, NamedValueSet, Registry, SkyPixDimension +from lsst.daf.butler import DatasetType, NamedValueSet, Registry, SkyPixDimension +from lsst.resources import ResourcePath, ResourcePathExpression from lsst.utils import doImport from . import pipelineIR, pipeTools @@ -246,25 +247,27 @@ def fromFile(cls, filename: str) -> Pipeline: return cls.from_uri(filename) @classmethod - def from_uri(cls, uri: Union[str, ButlerURI]) -> Pipeline: + def from_uri(cls, uri: ResourcePathExpression) -> Pipeline: """Load a pipeline defined in a pipeline yaml file at a location specified by a URI. Parameters ---------- - uri: `str` or `ButlerURI` - If a string is supplied this should be a URI path that points to a - pipeline defined in yaml format. This uri may also supply - additional labels to be used in subsetting the loaded Pipeline. - These labels are separated from the path by a \\#, and may be - specified as a comma separated list, or a range denoted as - beginning..end. Beginning or end may be empty, in which case the - range will be a half open interval. Unlike python iteration - bounds, end bounds are *INCLUDED*. Note that range based selection - is not well defined for pipelines that are not linear in nature, - and correct behavior is not guaranteed, or may vary from run to - run. The same specifiers can be used with a ButlerURI object, by - being the sole contents in the fragments attribute. + uri: convertible to `ResourcePath` + If a string is supplied this should be a URI path that points to a + pipeline defined in yaml format, either as a direct path to the + yaml file, or as a directory containing a "pipeline.yaml" file (the + form used by `write_to_uri` with ``expand=True``). This uri may + also supply additional labels to be used in subsetting the loaded + Pipeline. These labels are separated from the path by a \\#, and + may be specified as a comma separated list, or a range denoted as + beginning..end. Beginning or end may be empty, in which case the + range will be a half open interval. Unlike python iteration bounds, + end bounds are *INCLUDED*. Note that range based selection is not + well defined for pipelines that are not linear in nature, and + correct behavior is not guaranteed, or may vary from run to run. + The same specifiers can be used with a `ResourcePath` object, by + being the sole contents in the fragments attribute. Returns ------- @@ -356,7 +359,7 @@ def subsetFromLabels(self, labelSpecifier: LabelSpecifier) -> Pipeline: return Pipeline.fromIR(self._pipelineIR.subset_from_labels(labelSet)) @staticmethod - def _parse_file_specifier(uri: Union[str, ButlerURI]) -> Tuple[ButlerURI, Optional[LabelSpecifier]]: + def _parse_file_specifier(uri: ResourcePathExpression) -> Tuple[ResourcePath, Optional[LabelSpecifier]]: """Split appart a uri and any possible label subsets""" if isinstance(uri, str): # This is to support legacy pipelines during transition @@ -370,7 +373,8 @@ def _parse_file_specifier(uri: Union[str, ButlerURI]) -> Tuple[ButlerURI, Option ) if uri.count("#") > 1: raise ValueError("Only one set of labels is allowed when specifying a pipeline to load") - uri = ButlerURI(uri) + # Everything else can be converted directly to ResourcePath. + uri = ResourcePath(uri) label_subset = uri.fragment or None specifier: Optional[LabelSpecifier] @@ -593,10 +597,17 @@ def _addConfigImpl(self, label: str, newConfig: pipelineIR.ConfigIR) -> None: def toFile(self, filename: str) -> None: self._pipelineIR.to_file(filename) - def write_to_uri(self, uri: Union[str, ButlerURI]) -> None: - # tasks need sorted each call because someone might have added or - # removed task, and caching changes does not seem worth the small - # overhead + def write_to_uri(self, uri: ResourcePathExpression) -> None: + """Write the pipeline to a file or directory. + + Parameters + ---------- + uri : convertible to `ResourcePath` + URI to write to; may have any scheme with `ResourcePath` write + support or no scheme for a local file/directory. Should have a + ``.yaml`` extension if ``expand=False`` and a trailing slash + (indicating a directory-like URI) if ``expand=True``. + """ labels = [td.label for td in self._toExpandedPipelineImpl(checkContracts=False)] self._pipelineIR.reorder_tasks(labels) self._pipelineIR.write_to_uri(uri) diff --git a/python/lsst/pipe/base/pipelineIR.py b/python/lsst/pipe/base/pipelineIR.py index 4431e4f43..c235e5713 100644 --- a/python/lsst/pipe/base/pipelineIR.py +++ b/python/lsst/pipe/base/pipelineIR.py @@ -33,7 +33,7 @@ import yaml from deprecated.sphinx import deprecated -from lsst.daf.butler import ButlerURI +from lsst.resources import ResourcePath, ResourcePathExpression class KeepInstrument: @@ -859,13 +859,13 @@ def from_file(cls, filename: str) -> PipelineIR: return cls.from_uri(filename) @classmethod - def from_uri(cls, uri: Union[str, ButlerURI]) -> PipelineIR: + def from_uri(cls, uri: ResourcePathExpression) -> PipelineIR: """Create a `PipelineIR` object from the document specified by the input uri. Parameters ---------- - uri: `str` or `ButlerURI` + uri: convertible to `ResourcePath` Location of document to use in creating a `PipelineIR` object. Returns @@ -873,17 +873,12 @@ def from_uri(cls, uri: Union[str, ButlerURI]) -> PipelineIR: pipelineIR : `PipelineIR` The loaded pipeline """ - loaded_uri = ButlerURI(uri) - # With ButlerURI we have the choice of always using a local file or - # reading in the bytes directly. Reading in bytes can be more - # efficient for reasonably-sized files when the resource is remote. - # For now use the local file variant. For a local file as_local() does - # nothing. - with loaded_uri.as_local() as local: + loaded_uri = ResourcePath(uri) + with loaded_uri.open("r") as buffer: # explicitly read here, there was some issue with yaml trying - # to read the ButlerURI itself (I think because it only + # to read the ResourcePath itself (I think because it only # pretends to be conformant to the io api) - loaded_yaml = yaml.load(local.read(), Loader=PipelineYamlLoader) + loaded_yaml = yaml.load(buffer.read(), Loader=PipelineYamlLoader) return cls(loaded_yaml) @deprecated( @@ -902,17 +897,20 @@ def to_file(self, filename: str): """ self.write_to_uri(filename) - def write_to_uri(self, uri: Union[ButlerURI, str]): + def write_to_uri( + self, + uri: ResourcePathExpression, + ): """Serialize this `PipelineIR` object into a yaml formatted string and write the output to a file at the specified uri. Parameters ---------- - uri: `str` or `ButlerURI` + uri: convertible to `ResourcePath` Location of document to write a `PipelineIR` object. """ - butlerUri = ButlerURI(uri) - butlerUri.write(yaml.dump(self.to_primitives(), sort_keys=False).encode()) + with ResourcePath(uri).open("w") as buffer: + yaml.dump(self.to_primitives(), buffer, sort_keys=False) def to_primitives(self) -> Dict[str, Any]: """Convert to a representation used in yaml serialization""" diff --git a/python/lsst/pipe/base/tests/simpleQGraph.py b/python/lsst/pipe/base/tests/simpleQGraph.py index 7706b4d1b..9ece00f0d 100644 --- a/python/lsst/pipe/base/tests/simpleQGraph.py +++ b/python/lsst/pipe/base/tests/simpleQGraph.py @@ -31,8 +31,9 @@ import lsst.pex.config as pexConfig import numpy from lsst.base import Packages -from lsst.daf.butler import Butler, ButlerURI, Config, DatasetType +from lsst.daf.butler import Butler, Config, DatasetType from lsst.daf.butler.core.logging import ButlerLogRecords +from lsst.resources import ResourcePath from lsst.utils import doImport from ... import base as pipeBase @@ -236,7 +237,7 @@ def makeSimpleButler(root: str, run: str = "test", inMemory: bool = True) -> But butler : `~lsst.daf.butler.Butler` Data butler instance. """ - root = ButlerURI(root, forceDirectory=True) + root = ResourcePath(root, forceDirectory=True) if not root.isLocal: raise ValueError(f"Only works with local root not {root}") config = Config()