diff --git a/python/lsst/pipe/base/pipeline.py b/python/lsst/pipe/base/pipeline.py index 9faab74bc..70eceb7b2 100644 --- a/python/lsst/pipe/base/pipeline.py +++ b/python/lsst/pipe/base/pipeline.py @@ -232,17 +232,19 @@ def from_uri(cls, uri: Union[str, ButlerURI]) -> Pipeline: ---------- uri: `str` or `ButlerURI` If a string is supplied this should be a URI path that points to a - pipeline defined in yaml format. This uri may also supply - additional labels to be used in subsetting the loaded Pipeline. - These labels are separated from the path by a \\#, and may be - specified as a comma separated list, or a range denoted as + pipeline defined in yaml format, either as a direct path to the yaml + file, or as a directory containing a "pipeline.yaml" file (the form + used by `write_to_uri` with ``expand=True``). This uri may also + supply additional labels to be used in subsetting the loaded + Pipeline. These labels are separated from the path by a \\#, and + may be specified as a comma separated list, or a range denoted as beginning..end. Beginning or end may be empty, in which case the - range will be a half open interval. Unlike python iteration - bounds, end bounds are *INCLUDED*. Note that range based selection - is not well defined for pipelines that are not linear in nature, - and correct behavior is not guaranteed, or may vary from run to - run. The same specifiers can be used with a ButlerURI object, by - being the sole contents in the fragments attribute. + range will be a half open interval. Unlike python iteration bounds, + end bounds are *INCLUDED*. Note that range based selection is not + well defined for pipelines that are not linear in nature, and + correct behavior is not guaranteed, or may vary from run to run. The + same specifiers can be used with a ButlerURI object, by being the + sole contents in the fragments attribute. Returns ------- @@ -259,6 +261,8 @@ def from_uri(cls, uri: Union[str, ButlerURI]) -> Pipeline: """ # Split up the uri and any labels that were supplied uri, label_specifier = cls._parse_file_specifier(uri) + if uri.dirLike: + uri = uri.join("pipeline.yaml") pipeline: Pipeline = cls.fromIR(pipelineIR.PipelineIR.from_uri(uri)) # If there are labels supplied, only keep those @@ -560,8 +564,43 @@ def _addConfigImpl(self, label: str, newConfig: pipelineIR.ConfigIR) -> None: def toFile(self, filename: str) -> None: self._pipelineIR.to_file(filename) - def write_to_uri(self, uri: Union[str, ButlerURI]) -> None: - self._pipelineIR.write_to_uri(uri) + def write_to_uri( + self, + uri: Union[str, ButlerURI], + expand: bool = False, + task_defs: Optional[Iterable[TaskDef]] = None, + ) -> None: + """Write the pipeline to a file or directory. + + Parameters + ---------- + uri : `str` or `ButlerURI` + URI to write to; may have any scheme with `ButlerURI` write + or no scheme for a local file/directory. Should have a ``.yaml`` + extension if ``expand=False`` and a trailing slash (indicating + a directory-like URI) if ``expand=True``. + expand : `bool`, optional + If `False`, write the pipeline to a single YAML file with + references to configuration files and other config overrides + unexpanded and unapplied, with tasks and subsets only minimally + validated (and not imported). If `True`, import all tasks, apply + all configuration overrides (including those supplied by an + instrument), resolve parameters, sort all sections + deterministically, and write the pipeline to a directory with + separate config files for each task as well as a single + ``pipeline.yaml`` file. + task_defs : `Iterable` [ `TaskDef` ], optional + Output of `toExpandedPipeline`; may be passed to avoid a second + call to that method internally. Ignored unless ``expand=True``. + """ + if expand: + if str(uri).endswith(".yaml"): + raise RuntimeError( + f"Expanded pipelines are written to directories, not YAML files like {uri}." + ) + self._write_expanded_dir(ButlerURI(uri, forceDirectory=True), task_defs=task_defs) + else: + self._pipelineIR.write_to_uri(uri) def toExpandedPipeline(self) -> Generator[TaskDef, None, None]: """Returns a generator of TaskDefs which can be used to create quantum @@ -653,6 +692,41 @@ def __eq__(self, other: object): return False return self._pipelineIR == other._pipelineIR + def _write_expanded_dir(self, uri: ButlerURI, task_defs: Optional[Iterable[TaskDef]] = None) -> None: + """Internal implementation of `write_to_uri` with ``expand=True`` and + a directory-like URI. + + Parameters + ---------- + uri : `str` or `ButlerURI` + URI to write to; may have any scheme with `ButlerURI` write or no + scheme for a local file/directory. Should have a trailing slash + (indicating a directory-like URI). + task_defs : `Iterable` [ `TaskDef` ], optional + Output of `toExpandedPipeline`; may be passed to avoid a second + call to that method internally. + """ + assert uri.dirLike, f"{uri} is not a directory-like URI." + # Expand the pipeline. This applies all config overrides, applies all + # parameters, checks contracts, and sorts tasks topologically with + # lexicographical (on label) tiebreaking. + if task_defs is not None: + task_defs = list(task_defs) + else: + task_defs = list(self.toExpandedPipeline()) + uri.mkdir() + config_dir_uri = uri.join("config/") + config_dir_uri.mkdir() + expanded_tasks: Dict[str, pipelineIR.TaskIR] = {} + for task_def in task_defs: + task_ir = pipelineIR.TaskIR(label=task_def.label, klass=task_def.taskName, config=[]) + config_uri = config_dir_uri.join(f"{task_def.label}.py") + with config_uri.open("w") as buffer: + task_def.config.saveToStream(buffer) + task_ir.config.append(pipelineIR.ConfigIR(file=[f"config/{task_def.label}.py"])) + expanded_tasks[task_def.label] = task_ir + self._pipelineIR.write_to_uri(uri.join("pipeline.yaml"), expanded_tasks=expanded_tasks) + @dataclass(frozen=True) class TaskDatasetTypes: diff --git a/python/lsst/pipe/base/pipelineIR.py b/python/lsst/pipe/base/pipelineIR.py index 7f775376d..32af85bdb 100644 --- a/python/lsst/pipe/base/pipelineIR.py +++ b/python/lsst/pipe/base/pipelineIR.py @@ -35,7 +35,7 @@ from collections.abc import Iterable as abcIterable from dataclasses import dataclass, field from deprecated.sphinx import deprecated -from typing import Any, List, Set, Union, Generator, MutableMapping, Optional, Dict, Type +from typing import Any, Mapping, List, Set, Union, Generator, MutableMapping, Optional, Dict, Type import copy import re @@ -192,14 +192,77 @@ def from_primitives(label: str, value: Union[List[str], dict]) -> LabeledSubset: "associated with a string") return LabeledSubset(label, set(subset), description) - def to_primitives(self) -> Dict[str, Union[List[str], str]]: + def to_primitives(self, sorter: Optional[Mapping[str, Any]] = None) -> Dict[str, Union[List[str], str]]: """Convert to a representation used in yaml serialization + + Parameters + ---------- + sorter : `Mapping` [ `str`, `object` ] + Mapping from task or subset label to a comparable object that + should be used to sort those labels. """ - accumulate: Dict[str, Union[List[str], str]] = {"subset": list(self.subset)} + contents = list(self.subset) + if sorter is not None: + contents.sort(key=lambda label: sorter[label]) + accumulate: Dict[str, Union[List[str], str]] = {"subset": contents} if self.description is not None: accumulate["description"] = self.description return accumulate + @staticmethod + def expand_nested(original_subsets: Mapping[str, LabeledSubset]) -> Dict[str, LabeledSubset]: + """Recursively expand subsets that contain the labels of other subsets. + + Parameters + ---------- + original_subsets : `Mapping` [ `str`, `LabeledSubset` ] + Mapping from string label to a labeled subset definition; keys must + match the `label` attribute of the corresponding value. + + Returns + ------- + new_subsets : `Dict` [ `str`, `LabeledSubset` ] + Mapping of new labeled subset definitions. Guaranteed to contain + new instances (even when an old instance did not contain any subset + labels) with only task labels. + + Raises + ------ + RuntimeError + Raised if a reference cycle is detected. + """ + done: Dict[str, LabeledSubset] = {} + in_progress: Set[str] = set() + + def expand_one_subset(subset_label: str) -> Set[str]: + """Expand the subset with the given label if it has not already + been done, returning its recursively-expanded (i.e. task label + only) contents. + + This updates `done` in-place, and uses `in_progress` to detect + cycles. + """ + if (already_expanded := done.get(subset_label)) is not None: + return already_expanded.subset + if subset_label in in_progress: + raise RuntimeError(f"Cycle detected in subset definitions involving {subset_label}.") + in_progress.add(subset_label) + original = original_subsets[subset_label] + new = LabeledSubset( + label=subset_label, + subset=set(original.subset - original_subsets.keys()), + description=original.description, + ) + for nested_subset_label in original.subset & original_subsets.keys(): + new.subset.update(expand_one_subset(original_subsets[nested_subset_label])) + in_progress.remove(subset_label) + done[subset_label] = new + return new.subset + + for k in original_subsets.keys(): + expand_one_subset(k) + return done + @dataclass class ParametersIR: @@ -918,7 +981,12 @@ def to_file(self, filename: str): """ self.write_to_uri(filename) - def write_to_uri(self, uri: Union[ButlerURI, str]): + def write_to_uri( + self, + uri: Union[ButlerURI, str], + *, + expanded_tasks: Optional[Mapping[str, TaskIR]] = None, + ): """Serialize this `PipelineIR` object into a yaml formatted string and write the output to a file at the specified uri. @@ -926,23 +994,86 @@ def write_to_uri(self, uri: Union[ButlerURI, str]): ---------- uri: `str` or `ButlerURI` Location of document to write a `PipelineIR` object. + expanded_tasks : `Mapping` [ `str`, `TaskIR` ], optional + Mapping containing replacement `TaskIR` objects that capture the + fully-expanded configuration rather than a set of overrides, in + deterministic order. When this is not `None`, the ``instrument`` + and ``parameters`` sections are not written and all other sections + are sorted (using the order of the given tasks) to maximize the + extent to which equivalent pipelines will be written identically. """ with ButlerURI(uri).open("w") as buffer: - yaml.dump(self.to_primitives(), buffer, sort_keys=False) + yaml.dump(self.to_primitives(expanded_tasks=expanded_tasks), buffer, sort_keys=False) - def to_primitives(self) -> Dict[str, Any]: + def to_primitives(self, expanded_tasks: Optional[Mapping[str, TaskIR]] = None) -> Dict[str, Any]: """Convert to a representation used in yaml serialization + + Parameters + ---------- + expanded_tasks : `Mapping` [ `str`, `TaskIR` ], optional + Mapping containing replacement `TaskIR` objects that capture the + fully-expanded configuration rather than a set of overrides, in + deterministic order. When this is not `None`, the ``instrument`` + and ``parameters`` sections are not written and all other sections + are sorted (using the order of the given tasks) to maximize the + extent to which equivalent pipelines will be written identically. + + Returns + ------- + primitives : `dict` + Dictionary that maps directly to the serialized YAML form. """ accumulate = {"description": self.description} - if self.instrument is not None: - accumulate['instrument'] = self.instrument - if self.parameters: - accumulate['parameters'] = self.parameters.to_primitives() - accumulate['tasks'] = {m: t.to_primitives() for m, t in self.tasks.items()} + sorter: Optional[Dict[str, int]] = None + if expanded_tasks is None: + tasks = self.tasks + labeled_subsets = self.labeled_subsets + contracts = self.contracts + # Instrument and parameters are only included in non-expanded form, + # because they'll have already been applied to configs in expanded + # form. It might be nice to include the instrument to check that + # the expanded pipeline is only used on data from that instrument, + # but we can't risk having the instrument's config overrides + # applied (again) on top of other configs that should supersede + # them. + if self.instrument is not None: + accumulate['instrument'] = self.instrument + if self.parameters: + accumulate['parameters'] = self.parameters.to_primitives() + else: + tasks = expanded_tasks + # Make a dict that maps task labels to their position in the + # (presumably ordered) mapping we were given. + sorter = {label: n for n, label in enumerate(expanded_tasks.keys())} + # Expand out subsets that reference other subsets, so that they + # all only reference task labels directly. + labeled_subsets = LabeledSubset.expand_nested(self.labeled_subsets) + # Sort the labeled subsets themselves by the position of their + # first task in the overall pipeline, followed by their own label + # to break ties. Note that we sort the tasks within them only + # later when we convert them to primitives, because at this point + # they hold `set` objects, not lists. + labeled_subsets = { + subset.label: subset + for subset in sorted( + labeled_subsets.values(), + key=lambda s: (min(sorter[t] for t in s.subset), s.label) + ) + } + # Sort contracts by the string expression itself, just for as much + # determinism as we can manage; can't help it if someone rewrites + # an expression to something equivalent that changes the sort + # order. + contracts = list(self.contracts) + contracts.sort(key=lambda c: c.contract) + + # Get primitives for sections common to expanded and non-expanded + # forms. + accumulate['tasks'] = {m: t.to_primitives() for m, t in tasks.items()} if len(self.contracts) > 0: - accumulate['contracts'] = [c.to_primitives() for c in self.contracts] - if self.labeled_subsets: - accumulate['subsets'] = {k: v.to_primitives() for k, v in self.labeled_subsets.items()} + accumulate['contracts'] = [c.to_primitives() for c in contracts] + if labeled_subsets: + accumulate['subsets'] = {k: v.to_primitives(sorter) for k, v in labeled_subsets.items()} return accumulate def __str__(self) -> str: diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 8b8911992..8d3a0e770 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -25,8 +25,9 @@ import os import textwrap import unittest +import yaml -from lsst.pipe.base import (Pipeline, TaskDef, PipelineDatasetTypes) +from lsst.pipe.base import (LabelSpecifier, Pipeline, TaskDef, PipelineDatasetTypes) from lsst.pipe.base.tests.simpleQGraph import ( AddTask, makeSimplePipeline, @@ -154,6 +155,82 @@ def test_relative_config_file_overrides(self): self.assertEqual(p2_loaded["task1"].config.addend, 3) self.assertEqual(p2_loaded["task2"].config.addend, 1) + def test_expanded_io(self): + """Test writing and reading a pipeline with expand=True. + """ + original_pipeline_str = textwrap.dedent(""" + description: Test Pipeline + parameters: + testValue: 5 + tasks: + # Note that topological ordering is addB, then a tie between + # addA and addC. + addC: + class: lsst.pipe.base.tests.simpleQGraph.AddTask + config: + connections.in_tmpl: _1 + connections.out_tmpl: _2c + addB: + class: lsst.pipe.base.tests.simpleQGraph.AddTask + config: + addend: parameters.testValue + connections.in_tmpl: _0 + connections.out_tmpl: _1 + addA: + class: lsst.pipe.base.tests.simpleQGraph.AddTask + config: + addend: 8 + connections.in_tmpl: _1 + connections.out_tmpl: _2a + subsets: + things: [addC, addB] + contracts: + addB.connections.out_tmpl == addA.connections.in_tmpl + """) + original_pipeline = Pipeline.fromString(original_pipeline_str) + + def check(pipeline): + """Check that a pipeline has the test content defined above. + + This does not include checks for contracts (no public API for + that) or the presence or absence of parameters or instruments, + because an expanded pipeline should not have those anymore. + """ + expanded = list(pipeline) + self.assertEqual([t.label for t in expanded], ["addB", "addA", "addC"]) + self.assertEqual( + [t.taskName for t in expanded], + ["lsst.pipe.base.tests.simpleQGraph.AddTask"] * 3, + ) + self.assertEqual([t.config.addend for t in expanded], [5, 8, 3]) + subset = pipeline.subsetFromLabels(LabelSpecifier(labels={"things"})) + self.assertEqual(len(subset), 2) + + check(original_pipeline) + + with lsst.utils.tests.temporaryDirectory() as root: + with self.assertRaises(RuntimeError): + original_pipeline.write_to_uri(os.path.join(root, "foo.yaml"), expand=True) + # Expansion writes to directories, not yaml files. + # But if we don't have a .yaml extension, we assume the caller + # wants a directory, even without a trailing slash. + original_pipeline.write_to_uri(os.path.join(root, "test_pipeline"), expand=True) + self.assertTrue(os.path.isdir(os.path.join(root, "test_pipeline"))) + self.assertTrue(os.path.isdir(os.path.join(root, "test_pipeline/config"))) + self.assertTrue(os.path.isfile(os.path.join(root, "test_pipeline/pipeline.yaml"))) + self.assertTrue(os.path.isfile(os.path.join(root, "test_pipeline/config/addA.py"))) + self.assertTrue(os.path.isfile(os.path.join(root, "test_pipeline/config/addB.py"))) + self.assertTrue(os.path.isfile(os.path.join(root, "test_pipeline/config/addC.py"))) + with open(os.path.join(root, "test_pipeline/pipeline.yaml"), "r") as buffer: + primitives = yaml.load(buffer) + self.assertEqual(primitives.keys(), {"description", "tasks", "subsets", "contracts"}) + self.assertEqual(len(primitives["contracts"]), 1) + self.assertEqual(primitives["subsets"], {"things": {"subset": ["addB", "addC"]}}) + read_pipeline_1 = Pipeline.from_uri(os.path.join(root, "test_pipeline")) + read_pipeline_2 = Pipeline.from_uri(os.path.join(root, "test_pipeline/pipeline.yaml")) + check(read_pipeline_1) + check(read_pipeline_2) + class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): pass