Skip to content

Commit

Permalink
Merge pull request #256 from lsst/tickets/DM-39779
Browse files Browse the repository at this point in the history
DM-39779: text-based graph visualization of pipelines
  • Loading branch information
TallJimbo authored Sep 13, 2023
2 parents 5122111 + 01b3285 commit 67eca24
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 40 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-39779.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `pipeline-graph` and `task-graph` options for `pipetask build --show`, which provide text-art visualization of pipeline graphs.
5 changes: 5 additions & 0 deletions python/lsst/ctrl/mpexec/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ def build(ctx: click.Context, **kwargs: Any) -> None:
"""
kwargs = _collectActions(ctx, **kwargs)
show = ShowInfo(kwargs.pop("show", []))
if kwargs.get("butler_config") is not None and {"pipeline-graph", "task-graph"}.isdisjoint(show.commands):
raise click.ClickException(
"--butler-config was provided but nothing uses it "
"(only --show pipeline-graph and --show task-graph do)."
)
script.build(**kwargs, show=show)
_unhandledShow(show, "build")

Expand Down
1 change: 1 addition & 0 deletions python/lsst/ctrl/mpexec/cli/opt/optionGroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def __init__(self) -> None:
ctrlMpExecOpts.save_pipeline_option(),
ctrlMpExecOpts.pipeline_dot_option(),
pipeBaseOpts.instrument_option(help=instrumentOptionHelp, metavar="instrument", multiple=True),
ctrlMpExecOpts.butler_config_option(required=False),
]


Expand Down
6 changes: 5 additions & 1 deletion python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,11 @@
composition; ``graph`` to show information about quanta;
``workflow`` to show information about quanta and their
dependency; ``tasks`` to show task composition; ``uri`` to show
predicted dataset URIs of quanta."""
predicted dataset URIs of quanta; ``pipeline-graph`` for a
text-based visualization of the pipeline (tasks and dataset types);
``task-graph`` for a text-based visualization of just the tasks.
With -b, pipeline-graph and task-graph include additional information.
"""
),
metavar="ITEM|ITEM=VALUE",
multiple=True,
Expand Down
27 changes: 24 additions & 3 deletions python/lsst/ctrl/mpexec/cli/script/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,21 @@

from types import SimpleNamespace

from lsst.daf.butler import Butler

from ... import CmdLineFwk
from ..utils import _PipelineAction


def build( # type: ignore
order_pipeline, pipeline, pipeline_actions, pipeline_dot, save_pipeline, show, **kwargs
order_pipeline,
pipeline,
pipeline_actions,
pipeline_dot,
save_pipeline,
show,
butler_config=None,
**kwargs,
):
"""Implement the command line interface `pipetask build` subcommand.
Expand All @@ -59,7 +68,14 @@ def build( # type: ignore
Path location for storing resulting pipeline definition in YAML format.
show : `lsst.ctrl.mpexec.showInfo.ShowInfo`
Descriptions of what to dump to stdout.
kwargs : `dict` [`str`, `str`]
butler_config : `str`, `dict`, or `lsst.daf.butler.Config`, optional
If `str`, `butler_config` is the path location of the gen3
butler/registry config file. If `dict`, `butler_config` is key value
pairs used to init or update the `lsst.daf.butler.Config` instance. If
`Config`, it is the object used to configure a Butler.
Only used to resolve pipeline graphs for --show pipeline-graph and
--show task-graph.
**kwargs
Ignored; click commands may accept options for more than one script
function and pass all the option kwargs to each of the script functions
which ingore these unused kwargs.
Expand Down Expand Up @@ -93,6 +109,11 @@ def build( # type: ignore
# Will raise an exception if it fails to build the pipeline.
pipeline = f.makePipeline(args)

show.show_pipeline_info(pipeline)
if butler_config is not None:
butler = Butler(butler_config, writeable=False)
else:
butler = None

show.show_pipeline_info(pipeline, butler=butler)

return pipeline
75 changes: 47 additions & 28 deletions python/lsst/ctrl/mpexec/showInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@

import lsst.pex.config as pexConfig
import lsst.pex.config.history as pexConfigHistory
from lsst.daf.butler import DatasetRef, DatasetType, DatastoreRecordData, NamedKeyMapping
from lsst.daf.butler import Butler, DatasetRef, DatasetType, DatastoreRecordData, NamedKeyMapping
from lsst.pipe.base import Pipeline, QuantumGraph
from lsst.pipe.base.pipeline_graph import visualization

from . import util
from .cmdLineFwk import _ButlerFactory
Expand Down Expand Up @@ -101,7 +102,15 @@ class ShowInfo:
Raised if some show commands are not recognized.
"""

pipeline_commands = {"pipeline", "config", "history", "tasks", "dump-config"}
pipeline_commands = {
"pipeline",
"config",
"history",
"tasks",
"dump-config",
"pipeline-graph",
"task-graph",
}
graph_commands = {"graph", "workflow", "uri"}

def __init__(self, show: list[str], stream: Any = None) -> None:
Expand All @@ -128,34 +137,43 @@ def unhandled(self) -> frozenset[str]:
"""Return the commands that have not yet been processed."""
return frozenset(set(self.commands) - self.handled)

def show_pipeline_info(self, pipeline: Pipeline) -> None:
def show_pipeline_info(self, pipeline: Pipeline, butler: Butler | None) -> None:
"""Display useful information about the pipeline.
Parameters
----------
pipeline : `lsst.pipe.base.Pipeline`
The pipeline to use when reporting information.
"""
if butler is not None:
registry = butler.registry
else:
registry = None
for command in self.pipeline_commands:
if command not in self.commands:
continue
args = self.commands[command]

if command == "pipeline":
print(pipeline, file=self.stream)
elif command == "config":
for arg in args:
self._showConfig(pipeline, arg, False)
elif command == "dump-config":
for arg in args:
self._showConfig(pipeline, arg, True)
elif command == "history":
for arg in args:
self._showConfigHistory(pipeline, arg)
elif command == "tasks":
self._showTaskHierarchy(pipeline)
else:
raise RuntimeError(f"Unexpectedly tried to process command {command!r}.")
match command:
case "pipeline":
print(pipeline, file=self.stream)
case "config":
for arg in args:
self._showConfig(pipeline, arg, False)
case "dump-config":
for arg in args:
self._showConfig(pipeline, arg, True)
case "history":
for arg in args:
self._showConfigHistory(pipeline, arg)
case "tasks":
self._showTaskHierarchy(pipeline)
case "pipeline-graph":
visualization.show(pipeline.to_graph(registry), self.stream, dataset_types=True)
case "task-graph":
visualization.show(pipeline.to_graph(registry), self.stream, dataset_types=False)
case _:
raise RuntimeError(f"Unexpectedly tried to process command {command!r}.")
self.handled.add(command)

def show_graph_info(self, graph: QuantumGraph, args: SimpleNamespace | None = None) -> None:
Expand All @@ -172,16 +190,17 @@ def show_graph_info(self, graph: QuantumGraph, args: SimpleNamespace | None = No
for command in self.graph_commands:
if command not in self.commands:
continue
if command == "graph":
self._showGraph(graph)
elif command == "uri":
if args is None:
raise ValueError("The uri option requires additional command line arguments.")
self._showUri(graph, args)
elif command == "workflow":
self._showWorkflow(graph)
else:
raise RuntimeError(f"Unexpectedly tried to process command {command!r}.")
match command:
case "graph":
self._showGraph(graph)
case "uri":
if args is None:
raise ValueError("The uri option requires additional command line arguments.")
self._showUri(graph, args)
case "workflow":
self._showWorkflow(graph)
case _:
raise RuntimeError(f"Unexpectedly tried to process command {command!r}.")
self.handled.add(command)

def _showConfig(self, pipeline: Pipeline, showArgs: str, dumpFullConfig: bool) -> None:
Expand Down
41 changes: 33 additions & 8 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ def testShowPipeline(self):
["pipeline", "config", "history=task::addend", "tasks", "dump-config", "config=task::add*"],
stream=stream,
)
show.show_pipeline_info(pipeline)
show.show_pipeline_info(pipeline, None)
self.assertEqual(show.unhandled, frozenset({}))
stream.seek(0)
output = stream.read()
Expand All @@ -438,44 +438,69 @@ def testShowPipeline(self):
self.assertIn("class: lsst.pipe.base.tests.simpleQGraph.AddTask", output) # pipeline

show = ShowInfo(["pipeline", "uri"], stream=stream)
show.show_pipeline_info(pipeline)
show.show_pipeline_info(pipeline, None)
self.assertEqual(show.unhandled, frozenset({"uri"}))
self.assertEqual(show.handled, {"pipeline"})

stream = StringIO()
show = ShowInfo(["config=task::addend.missing"], stream=stream) # No match
show.show_pipeline_info(pipeline)
show.show_pipeline_info(pipeline, None)
stream.seek(0)
output = stream.read().strip()
self.assertEqual("### Configuration for task `task'", output)

stream = StringIO()
show = ShowInfo(["config=task::addEnd:NOIGNORECASE"], stream=stream) # No match
show.show_pipeline_info(pipeline)
show.show_pipeline_info(pipeline, None)
stream.seek(0)
output = stream.read().strip()
self.assertEqual("### Configuration for task `task'", output)

stream = StringIO()
show = ShowInfo(["pipeline-graph"], stream=stream) # No match
show.show_pipeline_info(pipeline, None)
stream.seek(0)
output = stream.read().strip()
self.assertEqual(
"\n".join(
[
"○ add_dataset_in",
"│",
"■ task",
"│",
"◍ add_dataset_out, add2_dataset_out",
]
),
output,
)

stream = StringIO()
show = ShowInfo(["task-graph"], stream=stream) # No match
show.show_pipeline_info(pipeline, None)
stream.seek(0)
output = stream.read().strip()
self.assertEqual("■ task", output)

stream = StringIO()
show = ShowInfo(["config=task::addEnd"], stream=stream) # Match but warns
show.show_pipeline_info(pipeline)
show.show_pipeline_info(pipeline, None)
stream.seek(0)
output = stream.read().strip()
self.assertIn("NOIGNORECASE", output)

show = ShowInfo(["dump-config=notask"])
with self.assertRaises(ValueError) as cm:
show.show_pipeline_info(pipeline)
show.show_pipeline_info(pipeline, None)
self.assertIn("Pipeline has no tasks named notask", str(cm.exception))

show = ShowInfo(["history"])
with self.assertRaises(ValueError) as cm:
show.show_pipeline_info(pipeline)
show.show_pipeline_info(pipeline, None)
self.assertIn("Please provide a value", str(cm.exception))

show = ShowInfo(["history=notask::param"])
with self.assertRaises(ValueError) as cm:
show.show_pipeline_info(pipeline)
show.show_pipeline_info(pipeline, None)
self.assertIn("Pipeline has no tasks named notask", str(cm.exception))

def test_execution_resources_parameters(self) -> None:
Expand Down

0 comments on commit 67eca24

Please sign in to comment.