Skip to content

Commit

Permalink
Add support for writing expanded pipelines to directories.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Dec 2, 2021
1 parent e718706 commit d18110f
Show file tree
Hide file tree
Showing 3 changed files with 309 additions and 27 deletions.
98 changes: 86 additions & 12 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,17 +232,19 @@ def from_uri(cls, uri: Union[str, ButlerURI]) -> Pipeline:
----------
uri: `str` or `ButlerURI`
If a string is supplied this should be a URI path that points to a
pipeline defined in yaml format. This uri may also supply
additional labels to be used in subsetting the loaded Pipeline.
These labels are separated from the path by a \\#, and may be
specified as a comma separated list, or a range denoted as
pipeline defined in yaml format, either as a direct path to the yaml
file, or as a directory containing a "pipeline.yaml" file (the form
used by `write_to_uri` with ``expand=True``). This uri may also
supply additional labels to be used in subsetting the loaded
Pipeline. These labels are separated from the path by a \\#, and
may be specified as a comma separated list, or a range denoted as
beginning..end. Beginning or end may be empty, in which case the
range will be a half open interval. Unlike python iteration
bounds, end bounds are *INCLUDED*. Note that range based selection
is not well defined for pipelines that are not linear in nature,
and correct behavior is not guaranteed, or may vary from run to
run. The same specifiers can be used with a ButlerURI object, by
being the sole contents in the fragments attribute.
range will be a half open interval. Unlike python iteration bounds,
end bounds are *INCLUDED*. Note that range based selection is not
well defined for pipelines that are not linear in nature, and
correct behavior is not guaranteed, or may vary from run to run. The
same specifiers can be used with a ButlerURI object, by being the
sole contents in the fragments attribute.
Returns
-------
Expand All @@ -259,6 +261,8 @@ def from_uri(cls, uri: Union[str, ButlerURI]) -> Pipeline:
"""
# Split up the uri and any labels that were supplied
uri, label_specifier = cls._parse_file_specifier(uri)
if uri.dirLike:
uri = uri.join("pipeline.yaml")
pipeline: Pipeline = cls.fromIR(pipelineIR.PipelineIR.from_uri(uri))

# If there are labels supplied, only keep those
Expand Down Expand Up @@ -560,8 +564,43 @@ def _addConfigImpl(self, label: str, newConfig: pipelineIR.ConfigIR) -> None:
def toFile(self, filename: str) -> None:
self._pipelineIR.to_file(filename)

def write_to_uri(self, uri: Union[str, ButlerURI]) -> None:
self._pipelineIR.write_to_uri(uri)
def write_to_uri(
self,
uri: Union[str, ButlerURI],
expand: bool = False,
task_defs: Optional[Iterable[TaskDef]] = None,
) -> None:
"""Write the pipeline to a file or directory.
Parameters
----------
uri : `str` or `ButlerURI`
URI to write to; may have any scheme with `ButlerURI` write
or no scheme for a local file/directory. Should have a ``.yaml``
extension if ``expand=False`` and a trailing slash (indicating
a directory-like URI) if ``expand=True``.
expand : `bool`, optional
If `False`, write the pipeline to a single YAML file with
references to configuration files and other config overrides
unexpanded and unapplied, with tasks and subsets only minimally
validated (and not imported). If `True`, import all tasks, apply
all configuration overrides (including those supplied by an
instrument), resolve parameters, sort all sections
deterministically, and write the pipeline to a directory with
separate config files for each task as well as a single
``pipeline.yaml`` file.
task_defs : `Iterable` [ `TaskDef` ], optional
Output of `toExpandedPipeline`; may be passed to avoid a second
call to that method internally. Ignored unless ``expand=True``.
"""
if expand:
if str(uri).endswith(".yaml"):
raise RuntimeError(
f"Expanded pipelines are written to directories, not YAML files like {uri}."
)
self._write_expanded_dir(ButlerURI(uri, forceDirectory=True), task_defs=task_defs)
else:
self._pipelineIR.write_to_uri(uri)

def toExpandedPipeline(self) -> Generator[TaskDef, None, None]:
"""Returns a generator of TaskDefs which can be used to create quantum
Expand Down Expand Up @@ -653,6 +692,41 @@ def __eq__(self, other: object):
return False
return self._pipelineIR == other._pipelineIR

def _write_expanded_dir(self, uri: ButlerURI, task_defs: Optional[Iterable[TaskDef]] = None) -> None:
"""Internal implementation of `write_to_uri` with ``expand=True`` and
a directory-like URI.
Parameters
----------
uri : `str` or `ButlerURI`
URI to write to; may have any scheme with `ButlerURI` write or no
scheme for a local file/directory. Should have a trailing slash
(indicating a directory-like URI).
task_defs : `Iterable` [ `TaskDef` ], optional
Output of `toExpandedPipeline`; may be passed to avoid a second
call to that method internally.
"""
assert uri.dirLike, f"{uri} is not a directory-like URI."
# Expand the pipeline. This applies all config overrides, applies all
# parameters, checks contracts, and sorts tasks topologically with
# lexicographical (on label) tiebreaking.
if task_defs is not None:
task_defs = list(task_defs)
else:
task_defs = list(self.toExpandedPipeline())
uri.mkdir()
config_dir_uri = uri.join("config/")
config_dir_uri.mkdir()
expanded_tasks: Dict[str, pipelineIR.TaskIR] = {}
for task_def in task_defs:
task_ir = pipelineIR.TaskIR(label=task_def.label, klass=task_def.taskName, config=[])
config_uri = config_dir_uri.join(f"{task_def.label}.py")
with config_uri.open("w") as buffer:
task_def.config.saveToStream(buffer)
task_ir.config.append(pipelineIR.ConfigIR(file=[f"config/{task_def.label}.py"]))
expanded_tasks[task_def.label] = task_ir
self._pipelineIR.write_to_uri(uri.join("pipeline.yaml"), expanded_tasks=expanded_tasks)


@dataclass(frozen=True)
class TaskDatasetTypes:
Expand Down
159 changes: 145 additions & 14 deletions python/lsst/pipe/base/pipelineIR.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from collections.abc import Iterable as abcIterable
from dataclasses import dataclass, field
from deprecated.sphinx import deprecated
from typing import Any, List, Set, Union, Generator, MutableMapping, Optional, Dict, Type
from typing import Any, Mapping, List, Set, Union, Generator, MutableMapping, Optional, Dict, Type

import copy
import re
Expand Down Expand Up @@ -192,14 +192,77 @@ def from_primitives(label: str, value: Union[List[str], dict]) -> LabeledSubset:
"associated with a string")
return LabeledSubset(label, set(subset), description)

def to_primitives(self) -> Dict[str, Union[List[str], str]]:
def to_primitives(self, sorter: Optional[Mapping[str, Any]] = None) -> Dict[str, Union[List[str], str]]:
"""Convert to a representation used in yaml serialization
Parameters
----------
sorter : `Mapping` [ `str`, `object` ]
Mapping from task or subset label to a comparable object that
should be used to sort those labels.
"""
accumulate: Dict[str, Union[List[str], str]] = {"subset": list(self.subset)}
contents = list(self.subset)
if sorter is not None:
contents.sort(key=lambda label: sorter[label])
accumulate: Dict[str, Union[List[str], str]] = {"subset": contents}
if self.description is not None:
accumulate["description"] = self.description
return accumulate

@staticmethod
def expand_nested(original_subsets: Mapping[str, LabeledSubset]) -> Dict[str, LabeledSubset]:
"""Recursively expand subsets that contain the labels of other subsets.
Parameters
----------
original_subsets : `Mapping` [ `str`, `LabeledSubset` ]
Mapping from string label to a labeled subset definition; keys must
match the `label` attribute of the corresponding value.
Returns
-------
new_subsets : `Dict` [ `str`, `LabeledSubset` ]
Mapping of new labeled subset definitions. Guaranteed to contain
new instances (even when an old instance did not contain any subset
labels) with only task labels.
Raises
------
RuntimeError
Raised if a reference cycle is detected.
"""
done: Dict[str, LabeledSubset] = {}
in_progress: Set[str] = set()

def expand_one_subset(subset_label: str) -> Set[str]:
"""Expand the subset with the given label if it has not already
been done, returning its recursively-expanded (i.e. task label
only) contents.
This updates `done` in-place, and uses `in_progress` to detect
cycles.
"""
if (already_expanded := done.get(subset_label)) is not None:
return already_expanded.subset
if subset_label in in_progress:
raise RuntimeError(f"Cycle detected in subset definitions involving {subset_label}.")
in_progress.add(subset_label)
original = original_subsets[subset_label]
new = LabeledSubset(
label=subset_label,
subset=set(original.subset - original_subsets.keys()),
description=original.description,
)
for nested_subset_label in original.subset & original_subsets.keys():
new.subset.update(expand_one_subset(original_subsets[nested_subset_label]))
in_progress.remove(subset_label)
done[subset_label] = new
return new.subset

for k in original_subsets.keys():
expand_one_subset(k)
return done


@dataclass
class ParametersIR:
Expand Down Expand Up @@ -918,31 +981,99 @@ def to_file(self, filename: str):
"""
self.write_to_uri(filename)

def write_to_uri(self, uri: Union[ButlerURI, str]):
def write_to_uri(
self,
uri: Union[ButlerURI, str],
*,
expanded_tasks: Optional[Mapping[str, TaskIR]] = None,
):
"""Serialize this `PipelineIR` object into a yaml formatted string and
write the output to a file at the specified uri.
Parameters
----------
uri: `str` or `ButlerURI`
Location of document to write a `PipelineIR` object.
expanded_tasks : `Mapping` [ `str`, `TaskIR` ], optional
Mapping containing replacement `TaskIR` objects that capture the
fully-expanded configuration rather than a set of overrides, in
deterministic order. When this is not `None`, the ``instrument``
and ``parameters`` sections are not written and all other sections
are sorted (using the order of the given tasks) to maximize the
extent to which equivalent pipelines will be written identically.
"""
with ButlerURI(uri).open("w") as buffer:
yaml.dump(self.to_primitives(), buffer, sort_keys=False)
yaml.dump(self.to_primitives(expanded_tasks=expanded_tasks), buffer, sort_keys=False)

def to_primitives(self) -> Dict[str, Any]:
def to_primitives(self, expanded_tasks: Optional[Mapping[str, TaskIR]] = None) -> Dict[str, Any]:
"""Convert to a representation used in yaml serialization
Parameters
----------
expanded_tasks : `Mapping` [ `str`, `TaskIR` ], optional
Mapping containing replacement `TaskIR` objects that capture the
fully-expanded configuration rather than a set of overrides, in
deterministic order. When this is not `None`, the ``instrument``
and ``parameters`` sections are not written and all other sections
are sorted (using the order of the given tasks) to maximize the
extent to which equivalent pipelines will be written identically.
Returns
-------
primitives : `dict`
Dictionary that maps directly to the serialized YAML form.
"""
accumulate = {"description": self.description}
if self.instrument is not None:
accumulate['instrument'] = self.instrument
if self.parameters:
accumulate['parameters'] = self.parameters.to_primitives()
accumulate['tasks'] = {m: t.to_primitives() for m, t in self.tasks.items()}
sorter: Optional[Dict[str, int]] = None
if expanded_tasks is None:
tasks = self.tasks
labeled_subsets = self.labeled_subsets
contracts = self.contracts
# Instrument and parameters are only included in non-expanded form,
# because they'll have already been applied to configs in expanded
# form. It might be nice to include the instrument to check that
# the expanded pipeline is only used on data from that instrument,
# but we can't risk having the instrument's config overrides
# applied (again) on top of other configs that should supersede
# them.
if self.instrument is not None:
accumulate['instrument'] = self.instrument
if self.parameters:
accumulate['parameters'] = self.parameters.to_primitives()
else:
tasks = expanded_tasks
# Make a dict that maps task labels to their position in the
# (presumably ordered) mapping we were given.
sorter = {label: n for n, label in enumerate(expanded_tasks.keys())}
# Expand out subsets that reference other subsets, so that they
# all only reference task labels directly.
labeled_subsets = LabeledSubset.expand_nested(self.labeled_subsets)
# Sort the labeled subsets themselves by the position of their
# first task in the overall pipeline, followed by their own label
# to break ties. Note that we sort the tasks within them only
# later when we convert them to primitives, because at this point
# they hold `set` objects, not lists.
labeled_subsets = {
subset.label: subset
for subset in sorted(
labeled_subsets.values(),
key=lambda s: (min(sorter[t] for t in s.subset), s.label)
)
}
# Sort contracts by the string expression itself, just for as much
# determinism as we can manage; can't help it if someone rewrites
# an expression to something equivalent that changes the sort
# order.
contracts = list(self.contracts)
contracts.sort(key=lambda c: c.contract)

# Get primitives for sections common to expanded and non-expanded
# forms.
accumulate['tasks'] = {m: t.to_primitives() for m, t in tasks.items()}
if len(self.contracts) > 0:
accumulate['contracts'] = [c.to_primitives() for c in self.contracts]
if self.labeled_subsets:
accumulate['subsets'] = {k: v.to_primitives() for k, v in self.labeled_subsets.items()}
accumulate['contracts'] = [c.to_primitives() for c in contracts]
if labeled_subsets:
accumulate['subsets'] = {k: v.to_primitives(sorter) for k, v in labeled_subsets.items()}
return accumulate

def __str__(self) -> str:
Expand Down
Loading

0 comments on commit d18110f

Please sign in to comment.