Skip to content

Commit

Permalink
Use ResourcePath and ResourcePathExpression in Pipeline methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Dec 21, 2021
1 parent 347f216 commit 35fd6a1
Showing 1 changed file with 19 additions and 18 deletions.
37 changes: 19 additions & 18 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@

# -----------------------------
# Imports for other modules --
from lsst.daf.butler import ButlerURI, DatasetType, NamedValueSet, Registry, SkyPixDimension
from lsst.daf.butler import DatasetType, NamedValueSet, Registry, SkyPixDimension
from lsst.resources import ResourcePath, ResourcePathExpression
from lsst.utils import doImport
from lsst.utils.introspection import get_full_type_name

Expand Down Expand Up @@ -245,13 +246,13 @@ def fromFile(cls, filename: str) -> Pipeline:
return cls.from_uri(filename)

@classmethod
def from_uri(cls, uri: Union[str, ButlerURI]) -> Pipeline:
def from_uri(cls, uri: ResourcePathExpression) -> Pipeline:
"""Load a pipeline defined in a pipeline yaml file at a location
specified by a URI.
Parameters
----------
uri: `str` or `ButlerURI`
uri: convertible to `ResourcePath`
If a string is supplied this should be a URI path that points to a
pipeline defined in yaml format, either as a direct path to the yaml
file, or as a directory containing a "pipeline.yaml" file (the form
Expand Down Expand Up @@ -359,7 +360,7 @@ def subsetFromLabels(self, labelSpecifier: LabelSpecifier) -> Pipeline:
return Pipeline.fromIR(self._pipelineIR.subset_from_labels(labelSet))

@staticmethod
def _parse_file_specifier(uri: Union[str, ButlerURI]) -> Tuple[ButlerURI, Optional[LabelSpecifier]]:
def _parse_file_specifier(uri: ResourcePathExpression) -> Tuple[ResourcePath, Optional[LabelSpecifier]]:
"""Split appart a uri and any possible label subsets"""
if isinstance(uri, str):
# This is to support legacy pipelines during transition
Expand All @@ -373,9 +374,9 @@ def _parse_file_specifier(uri: Union[str, ButlerURI]) -> Tuple[ButlerURI, Option
)
if uri.count("#") > 1:
raise ValueError("Only one set of labels is allowed when specifying a pipeline to load")
uri = ButlerURI(uri)
uri = ResourcePath(uri)
elif isinstance(uri, Path):
uri = ButlerURI(uri)
uri = ResourcePath(uri)
label_subset = uri.fragment or None

specifier: Optional[LabelSpecifier]
Expand Down Expand Up @@ -595,19 +596,19 @@ def toFile(self, filename: str) -> None:

def write_to_uri(
self,
uri: Union[str, ButlerURI],
uri: ResourcePathExpression,
expand: bool = False,
task_defs: Optional[Iterable[TaskDef]] = None,
) -> None:
"""Write the pipeline to a file or directory.
Parameters
----------
uri : `str` or `ButlerURI`
URI to write to; may have any scheme with `ButlerURI` write
or no scheme for a local file/directory. Should have a ``.yaml``
extension if ``expand=False`` and a trailing slash (indicating
a directory-like URI) if ``expand=True``.
uri : convertible to `ResourcePath`
URI to write to; may have any scheme with `ResourcePath` write
support or no scheme for a local file/directory. Should have a
``.yaml`` extension if ``expand=False`` and a trailing slash
(indicating a directory-like URI) if ``expand=True``.
expand : `bool`, optional
If `False`, write the pipeline to a single YAML file with
references to configuration files and other config overrides
Expand All @@ -627,7 +628,7 @@ def write_to_uri(
raise RuntimeError(
f"Expanded pipelines are written to directories, not YAML files like {uri}."
)
self._write_expanded_dir(ButlerURI(uri, forceDirectory=True), task_defs=task_defs)
self._write_expanded_dir(ResourcePath(uri, forceDirectory=True), task_defs=task_defs)
else:
self._pipelineIR.write_to_uri(uri)

Expand Down Expand Up @@ -741,16 +742,16 @@ def description(self) -> str:
"""The string description of the pipeline."""
return self._pipelineIR.description

def _write_expanded_dir(self, uri: ButlerURI, task_defs: Optional[Iterable[TaskDef]] = None) -> None:
def _write_expanded_dir(self, uri: ResourcePath, task_defs: Optional[Iterable[TaskDef]] = None) -> None:
"""Internal implementation of `write_to_uri` with ``expand=True`` and
a directory-like URI.
Parameters
----------
uri : `str` or `ButlerURI`
URI to write to; may have any scheme with `ButlerURI` write or no
scheme for a local file/directory. Should have a trailing slash
(indicating a directory-like URI).
uri : `ResourcePath`
URI to write to; may have any scheme with `ResourcePath` write
support or no scheme for a local file/directory. Should have a
trailing slash (indicating a directory-like URI).
task_defs : `Iterable` [ `TaskDef` ], optional
Output of `toExpandedPipeline`; may be passed to avoid a second
call to that method internally.
Expand Down

0 comments on commit 35fd6a1

Please sign in to comment.