Skip to content

Commit

Permalink
Merge pull request #288 from lsst/tickets/DM-41542
Browse files Browse the repository at this point in the history
DM-41542:  Add --summary option to pipetask qgraph.
  • Loading branch information
MichelleGower authored Apr 17, 2024
2 parents d04c916 + c9c5ed8 commit 9f21ca1
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 24 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ jobs:
run: |
pytest -r a -v -n 3 --open-files --cov=lsst.ctrl.mpexec --cov=tests --cov-report=xml --cov-report=term --cov-branch
- name: Upload coverage to codecov
uses: codecov/codecov-action@v2
uses: codecov/codecov-action@v4
with:
file: ./coverage.xml
files: ./coverage.xml
token: ${{ secrets.CODECOV_TOKEN }}

pypi:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions doc/changes/DM-41542.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added --summary option to ``pipetask qgraph``.
12 changes: 11 additions & 1 deletion python/lsst/ctrl/mpexec/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import click
import coverage
import lsst.pipe.base.cli.opt as pipeBaseOpts
from lsst.ctrl.mpexec import Report
from lsst.ctrl.mpexec.showInfo import ShowInfo
from lsst.daf.butler.cli.opt import (
config_file_option,
Expand Down Expand Up @@ -177,6 +178,7 @@ def coverage_context(kwargs: dict[str, Any]) -> Iterator[None]:
def qgraph(ctx: click.Context, **kwargs: Any) -> None:
"""Build and optionally save quantum graph."""
kwargs = _collectActions(ctx, **kwargs)
summary = kwargs.pop("summary", None)
with coverage_context(kwargs):
show = ShowInfo(kwargs.pop("show", []))
pipeline = script.build(**kwargs, show=show)
Expand All @@ -186,8 +188,16 @@ def qgraph(ctx: click.Context, **kwargs: Any) -> None:
file=sys.stderr,
)
return
if script.qgraph(pipelineObj=pipeline, **kwargs, show=show) is None:
if (qgraph := script.qgraph(pipelineObj=pipeline, **kwargs, show=show)) is None:
raise click.ClickException("QuantumGraph was empty; CRITICAL logs above should provide details.")
# QuantumGraph-only summary call here since script.qgraph also called
# by run methods.
if summary:
report = Report(qgraphSummary=qgraph.getSummary())
with open(summary, "w") as out:
# Do not save fields that are not set.
out.write(report.model_dump_json(exclude_none=True, indent=2))

_unhandledShow(show, "qgraph")


Expand Down
1 change: 1 addition & 0 deletions python/lsst/ctrl/mpexec/cli/opt/optionGroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def __init__(self) -> None:
ctrlMpExecOpts.save_qgraph_option(),
ctrlMpExecOpts.save_single_quanta_option(),
ctrlMpExecOpts.qgraph_dot_option(),
ctrlMpExecOpts.summary_option(),
ctrlMpExecOpts.save_execution_butler_option(),
ctrlMpExecOpts.clobber_execution_butler_option(),
ctrlMpExecOpts.target_datastore_root_option(),
Expand Down
25 changes: 9 additions & 16 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import atexit
import contextlib
import copy
import datetime
import getpass
import logging
import shutil
from collections.abc import Mapping, Sequence
Expand Down Expand Up @@ -81,6 +79,7 @@
from .executionGraphFixup import ExecutionGraphFixup
from .mpGraphExecutor import MPGraphExecutor
from .preExecInit import PreExecInit, PreExecInitLimited
from .reports import Report
from .singleQuantumExecutor import SingleQuantumExecutor

# ----------------------------------
Expand Down Expand Up @@ -653,8 +652,6 @@ def makeGraph(self, pipeline: Pipeline, args: SimpleNamespace) -> QuantumGraph |
"skip_existing_in": args.skip_existing_in,
"skip_existing": args.skip_existing,
"data_query": args.data_query,
"user": getpass.getuser(),
"time": f"{datetime.datetime.now()}",
}
assert run is not None, "Butler output run collection must be defined"
qgraph = graph_builder.build(metadata, attach_datastore_records=args.qgraph_datastore_records)
Expand Down Expand Up @@ -841,25 +838,20 @@ def runPipeline(
# Do not save fields that are not set.
out.write(report.model_dump_json(exclude_none=True, indent=2))

def _generateTaskTable(self, qgraph: QuantumGraph) -> Table:
def _generateTaskTable(self) -> Table:
"""Generate astropy table listing the number of quanta per task for a
given quantum graph.
Parameters
----------
qgraph : `lsst.pipe.base.graph.graph.QuantumGraph`
A QuantumGraph object.
Returns
-------
qg_task_table : `astropy.table.table.Table`
An astropy table containing columns: Quanta and Tasks.
"""
qg_quanta, qg_tasks = [], []
for task_def in qgraph.iterTaskGraph():
num_qnodes = qgraph.getNumberOfQuantaForTask(task_def)
qg_quanta.append(num_qnodes)
qg_tasks.append(task_def.label)
for task_label, task_info in self.report.qgraphSummary.qgraphTaskSummaries.items():
qg_tasks.append(task_label)
qg_quanta.append(task_info.numQuanta)

qg_task_table = Table(dict(Quanta=qg_quanta, Tasks=qg_tasks))
return qg_task_table

Expand All @@ -880,8 +872,9 @@ def _summarize_qgraph(self, qgraph: QuantumGraph) -> int:
if n_quanta == 0:
_LOG.info("QuantumGraph contains no quanta.")
else:
self.report = Report(qgraphSummary=qgraph.getSummary())
if _LOG.isEnabledFor(logging.INFO):
qg_task_table = self._generateTaskTable(qgraph)
qg_task_table = self._generateTaskTable()
qg_task_table_formatted = "\n".join(qg_task_table.pformat_all())
quanta_str = "quantum" if n_quanta == 1 else "quanta"
n_tasks = len(qgraph.taskGraph)
Expand Down Expand Up @@ -1026,4 +1019,4 @@ def runGraphQBB(self, task_factory: TaskFactory, args: SimpleNamespace) -> None:
if report:
with open(args.summary, "w") as out:
# Do not save fields that are not set.
out.write(report.json(exclude_none=True, indent=2))
out.write(report.model_dump_json(exclude_none=True, indent=2))
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def __init__(
def execute(self, graph: QuantumGraph) -> None:
# Docstring inherited from QuantumGraphExecutor.execute
graph = self._fixupQuanta(graph)
self.report = Report()
self.report = Report(qgraphSummary=graph.getSummary())
try:
if self.numProc > 1:
self._executeQuantaMP(graph, self.report)
Expand Down
4 changes: 4 additions & 0 deletions python/lsst/ctrl/mpexec/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import pydantic
from lsst.daf.butler import DataCoordinate, DataId, DataIdValue
from lsst.pipe.base import QgraphSummary
from lsst.utils.introspection import get_full_type_name


Expand Down Expand Up @@ -195,6 +196,9 @@ def from_exit_code(
class Report(pydantic.BaseModel):
"""Execution report for the whole job with one or few quanta."""

qgraphSummary: QgraphSummary
"""Summary report about QuantumGraph."""

status: ExecutionStatus = ExecutionStatus.SUCCESS
"""Job status."""

Expand Down
82 changes: 82 additions & 0 deletions tests/test_cliCmdQgraph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# This file is part of ctrl_mpexec.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Unit tests for ctrl_mpexec CLI qgraph subcommand."""

import os
import unittest

from lsst.ctrl.mpexec import Report
from lsst.ctrl.mpexec.cli.pipetask import cli as pipetask_cli
from lsst.daf.butler.cli.utils import LogCliRunner, clickResultMsg
from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir
from lsst.pipe.base.tests.simpleQGraph import makeSimpleQGraph

TESTDIR = os.path.abspath(os.path.dirname(__file__))


class QgraphTest(unittest.TestCase):
"""Test executing "pipetask qgraph" command."""

def setUp(self) -> None:
self.runner = LogCliRunner()
self.root = makeTestTempDir(TESTDIR)

def tearDown(self) -> None:
removeTestTempDir(self.root)

def test_qgraph_summary(self):
"""Test for making a summary of a QuantumGraph."""
metadata = {"output_run": "run"}
butler, qgraph = makeSimpleQGraph(
run="run",
root=self.root,
metadata=metadata,
)

graph_uri = os.path.join(self.root, "graph.qgraph")
qgraph.saveUri(graph_uri)

test_filename = os.path.join(self.root, "summary.json")

result = self.runner.invoke(
pipetask_cli,
["qgraph", "--butler-config", self.root, "--qgraph", graph_uri, "--summary", test_filename],
input="no",
)
# Check that we can read from the command line
self.assertEqual(result.exit_code, 0, clickResultMsg(result))

# Check that we can open and read the file produced by make_reports
with open(test_filename) as f:
summary = Report.model_validate_json(f.read())
self.assertEqual(summary.qgraphSummary.outputRun, "run")
self.assertEqual(len(summary.qgraphSummary.qgraphTaskSummaries), 5)


if __name__ == "__main__":
unittest.main()
26 changes: 25 additions & 1 deletion tests/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
)
from lsst.ctrl.mpexec.execFixupDataId import ExecFixupDataId
from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir
from lsst.pipe.base import NodeId
from lsst.pipe.base import NodeId, QgraphSummary, QgraphTaskSummary
from lsst.pipe.base.tests.simpleQGraph import AddTaskFactoryMock, makeSimpleQGraph

logging.basicConfig(level=logging.DEBUG)
Expand Down Expand Up @@ -209,6 +209,30 @@ def determineInputsToQuantumNode(self, node):
result.add(otherNode)
return result

def getSummary(self):
summary = QgraphSummary(
graphID="1712445133.605479-3902002",
cmdLine="mock_pipetask -a 1 -b 2 -c 3 4 5 6",
pipeBaseVersion="1.1.1",
creationUTC="",
inputCollection=["mock_input"],
outputCollection="mock_output",
outputRun="mock_run",
)
for q in self:
qts = summary.qgraphTaskSummaries.setdefault(
q.taskDef.label, QgraphTaskSummary(taskLabel=q.taskDef.label)
)
qts.numQuanta += 1

for k in ["in1", "in2", "in3"]:
qts.numInputs[k] += 1

for k in ["out1", "out2", "out3"]:
qts.numOutputs[k] += 1

return summary


class TaskMockMP:
"""Simple mock class for task supporting multiprocessing."""
Expand Down
11 changes: 8 additions & 3 deletions tests/test_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import unittest

from lsst.ctrl.mpexec import ExecutionStatus, QuantumReport, Report
from lsst.pipe.base import QgraphSummary


class ReportsTestCase(unittest.TestCase):
Expand Down Expand Up @@ -74,7 +75,7 @@ def test_quantumReport(self):

def test_report(self):
"""Test for Report class."""
report = Report()
report = Report(qgraphSummary=QgraphSummary(graphID="uuid"))
self.assertEqual(report.status, ExecutionStatus.SUCCESS)
self.assertIsNotNone(report.cmdLine)
self.assertIsNone(report.exitCode)
Expand All @@ -86,7 +87,9 @@ def test_report(self):
qr = QuantumReport.from_exception(
exception=RuntimeError("runtime error"), dataId=dataId, taskLabel=taskLabel
)
report = Report(status=ExecutionStatus.FAILURE, exitCode=-1)
report = Report(
status=ExecutionStatus.FAILURE, exitCode=-1, qgraphSummary=QgraphSummary(graphID="uuid")
)
report.set_exception(RuntimeError("runtime error"))
report.quantaReports.append(qr)
self.assertEqual(report.status, ExecutionStatus.FAILURE)
Expand All @@ -103,7 +106,9 @@ def test_json(self):
qr = QuantumReport.from_exception(
exception=RuntimeError("runtime error"), dataId=dataId, taskLabel=taskLabel
)
report = Report(status=ExecutionStatus.FAILURE, exitCode=-1)
report = Report(
status=ExecutionStatus.FAILURE, exitCode=-1, qgraphSummary=QgraphSummary(graphID="uuid")
)
report.set_exception(RuntimeError("runtime error"))
report.quantaReports.append(qr)
json = report.model_dump_json(exclude_none=True, indent=2)
Expand Down

0 comments on commit 9f21ca1

Please sign in to comment.