diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 55a31eb7..c9dc4747 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -61,9 +61,10 @@ jobs: run: | pytest -r a -v -n 3 --open-files --cov=lsst.ctrl.mpexec --cov=tests --cov-report=xml --cov-report=term --cov-branch - name: Upload coverage to codecov - uses: codecov/codecov-action@v2 + uses: codecov/codecov-action@v4 with: - file: ./coverage.xml + files: ./coverage.xml + token: ${{ secrets.CODECOV_TOKEN }} pypi: runs-on: ubuntu-latest diff --git a/doc/changes/DM-41542.feature.rst b/doc/changes/DM-41542.feature.rst new file mode 100644 index 00000000..d16cfce5 --- /dev/null +++ b/doc/changes/DM-41542.feature.rst @@ -0,0 +1 @@ +Added --summary option to ``pipetask qgraph``. diff --git a/python/lsst/ctrl/mpexec/cli/cmd/commands.py b/python/lsst/ctrl/mpexec/cli/cmd/commands.py index 39392807..70204f2c 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/commands.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/commands.py @@ -35,6 +35,7 @@ import click import coverage import lsst.pipe.base.cli.opt as pipeBaseOpts +from lsst.ctrl.mpexec import Report from lsst.ctrl.mpexec.showInfo import ShowInfo from lsst.daf.butler.cli.opt import ( config_file_option, @@ -177,6 +178,7 @@ def coverage_context(kwargs: dict[str, Any]) -> Iterator[None]: def qgraph(ctx: click.Context, **kwargs: Any) -> None: """Build and optionally save quantum graph.""" kwargs = _collectActions(ctx, **kwargs) + summary = kwargs.pop("summary", None) with coverage_context(kwargs): show = ShowInfo(kwargs.pop("show", [])) pipeline = script.build(**kwargs, show=show) @@ -186,8 +188,16 @@ def qgraph(ctx: click.Context, **kwargs: Any) -> None: file=sys.stderr, ) return - if script.qgraph(pipelineObj=pipeline, **kwargs, show=show) is None: + if (qgraph := script.qgraph(pipelineObj=pipeline, **kwargs, show=show)) is None: raise click.ClickException("QuantumGraph was empty; CRITICAL logs above should provide details.") + # QuantumGraph-only summary call here since script.qgraph also called + # by run methods. + if summary: + report = Report(qgraphSummary=qgraph.getSummary()) + with open(summary, "w") as out: + # Do not save fields that are not set. + out.write(report.model_dump_json(exclude_none=True, indent=2)) + _unhandledShow(show, "qgraph") diff --git a/python/lsst/ctrl/mpexec/cli/opt/optionGroups.py b/python/lsst/ctrl/mpexec/cli/opt/optionGroups.py index e2255d57..d68cc3f9 100644 --- a/python/lsst/ctrl/mpexec/cli/opt/optionGroups.py +++ b/python/lsst/ctrl/mpexec/cli/opt/optionGroups.py @@ -109,6 +109,7 @@ def __init__(self) -> None: ctrlMpExecOpts.save_qgraph_option(), ctrlMpExecOpts.save_single_quanta_option(), ctrlMpExecOpts.qgraph_dot_option(), + ctrlMpExecOpts.summary_option(), ctrlMpExecOpts.save_execution_butler_option(), ctrlMpExecOpts.clobber_execution_butler_option(), ctrlMpExecOpts.target_datastore_root_option(), diff --git a/python/lsst/ctrl/mpexec/cmdLineFwk.py b/python/lsst/ctrl/mpexec/cmdLineFwk.py index 0af27d24..ff742ec7 100644 --- a/python/lsst/ctrl/mpexec/cmdLineFwk.py +++ b/python/lsst/ctrl/mpexec/cmdLineFwk.py @@ -35,8 +35,6 @@ import atexit import contextlib import copy -import datetime -import getpass import logging import shutil from collections.abc import Mapping, Sequence @@ -81,6 +79,7 @@ from .executionGraphFixup import ExecutionGraphFixup from .mpGraphExecutor import MPGraphExecutor from .preExecInit import PreExecInit, PreExecInitLimited +from .reports import Report from .singleQuantumExecutor import SingleQuantumExecutor # ---------------------------------- @@ -653,8 +652,6 @@ def makeGraph(self, pipeline: Pipeline, args: SimpleNamespace) -> QuantumGraph | "skip_existing_in": args.skip_existing_in, "skip_existing": args.skip_existing, "data_query": args.data_query, - "user": getpass.getuser(), - "time": f"{datetime.datetime.now()}", } assert run is not None, "Butler output run collection must be defined" qgraph = graph_builder.build(metadata, attach_datastore_records=args.qgraph_datastore_records) @@ -841,25 +838,20 @@ def runPipeline( # Do not save fields that are not set. out.write(report.model_dump_json(exclude_none=True, indent=2)) - def _generateTaskTable(self, qgraph: QuantumGraph) -> Table: + def _generateTaskTable(self) -> Table: """Generate astropy table listing the number of quanta per task for a given quantum graph. - Parameters - ---------- - qgraph : `lsst.pipe.base.graph.graph.QuantumGraph` - A QuantumGraph object. - Returns ------- qg_task_table : `astropy.table.table.Table` An astropy table containing columns: Quanta and Tasks. """ qg_quanta, qg_tasks = [], [] - for task_def in qgraph.iterTaskGraph(): - num_qnodes = qgraph.getNumberOfQuantaForTask(task_def) - qg_quanta.append(num_qnodes) - qg_tasks.append(task_def.label) + for task_label, task_info in self.report.qgraphSummary.qgraphTaskSummaries.items(): + qg_tasks.append(task_label) + qg_quanta.append(task_info.numQuanta) + qg_task_table = Table(dict(Quanta=qg_quanta, Tasks=qg_tasks)) return qg_task_table @@ -880,8 +872,9 @@ def _summarize_qgraph(self, qgraph: QuantumGraph) -> int: if n_quanta == 0: _LOG.info("QuantumGraph contains no quanta.") else: + self.report = Report(qgraphSummary=qgraph.getSummary()) if _LOG.isEnabledFor(logging.INFO): - qg_task_table = self._generateTaskTable(qgraph) + qg_task_table = self._generateTaskTable() qg_task_table_formatted = "\n".join(qg_task_table.pformat_all()) quanta_str = "quantum" if n_quanta == 1 else "quanta" n_tasks = len(qgraph.taskGraph) @@ -1026,4 +1019,4 @@ def runGraphQBB(self, task_factory: TaskFactory, args: SimpleNamespace) -> None: if report: with open(args.summary, "w") as out: # Do not save fields that are not set. - out.write(report.json(exclude_none=True, indent=2)) + out.write(report.model_dump_json(exclude_none=True, indent=2)) diff --git a/python/lsst/ctrl/mpexec/mpGraphExecutor.py b/python/lsst/ctrl/mpexec/mpGraphExecutor.py index 8a827f7b..bb9d8282 100644 --- a/python/lsst/ctrl/mpexec/mpGraphExecutor.py +++ b/python/lsst/ctrl/mpexec/mpGraphExecutor.py @@ -404,7 +404,7 @@ def __init__( def execute(self, graph: QuantumGraph) -> None: # Docstring inherited from QuantumGraphExecutor.execute graph = self._fixupQuanta(graph) - self.report = Report() + self.report = Report(qgraphSummary=graph.getSummary()) try: if self.numProc > 1: self._executeQuantaMP(graph, self.report) diff --git a/python/lsst/ctrl/mpexec/reports.py b/python/lsst/ctrl/mpexec/reports.py index e522d021..1d02dc01 100644 --- a/python/lsst/ctrl/mpexec/reports.py +++ b/python/lsst/ctrl/mpexec/reports.py @@ -35,6 +35,7 @@ import pydantic from lsst.daf.butler import DataCoordinate, DataId, DataIdValue +from lsst.pipe.base import QgraphSummary from lsst.utils.introspection import get_full_type_name @@ -195,6 +196,9 @@ def from_exit_code( class Report(pydantic.BaseModel): """Execution report for the whole job with one or few quanta.""" + qgraphSummary: QgraphSummary + """Summary report about QuantumGraph.""" + status: ExecutionStatus = ExecutionStatus.SUCCESS """Job status.""" diff --git a/tests/test_cliCmdQgraph.py b/tests/test_cliCmdQgraph.py new file mode 100644 index 00000000..8ca059b3 --- /dev/null +++ b/tests/test_cliCmdQgraph.py @@ -0,0 +1,82 @@ +# This file is part of ctrl_mpexec. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Unit tests for ctrl_mpexec CLI qgraph subcommand.""" + +import os +import unittest + +from lsst.ctrl.mpexec import Report +from lsst.ctrl.mpexec.cli.pipetask import cli as pipetask_cli +from lsst.daf.butler.cli.utils import LogCliRunner, clickResultMsg +from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir +from lsst.pipe.base.tests.simpleQGraph import makeSimpleQGraph + +TESTDIR = os.path.abspath(os.path.dirname(__file__)) + + +class QgraphTest(unittest.TestCase): + """Test executing "pipetask qgraph" command.""" + + def setUp(self) -> None: + self.runner = LogCliRunner() + self.root = makeTestTempDir(TESTDIR) + + def tearDown(self) -> None: + removeTestTempDir(self.root) + + def test_qgraph_summary(self): + """Test for making a summary of a QuantumGraph.""" + metadata = {"output_run": "run"} + butler, qgraph = makeSimpleQGraph( + run="run", + root=self.root, + metadata=metadata, + ) + + graph_uri = os.path.join(self.root, "graph.qgraph") + qgraph.saveUri(graph_uri) + + test_filename = os.path.join(self.root, "summary.json") + + result = self.runner.invoke( + pipetask_cli, + ["qgraph", "--butler-config", self.root, "--qgraph", graph_uri, "--summary", test_filename], + input="no", + ) + # Check that we can read from the command line + self.assertEqual(result.exit_code, 0, clickResultMsg(result)) + + # Check that we can open and read the file produced by make_reports + with open(test_filename) as f: + summary = Report.model_validate_json(f.read()) + self.assertEqual(summary.qgraphSummary.outputRun, "run") + self.assertEqual(len(summary.qgraphSummary.qgraphTaskSummaries), 5) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_executors.py b/tests/test_executors.py index ae0328fc..0c96cb40 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -52,7 +52,7 @@ ) from lsst.ctrl.mpexec.execFixupDataId import ExecFixupDataId from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir -from lsst.pipe.base import NodeId +from lsst.pipe.base import NodeId, QgraphSummary, QgraphTaskSummary from lsst.pipe.base.tests.simpleQGraph import AddTaskFactoryMock, makeSimpleQGraph logging.basicConfig(level=logging.DEBUG) @@ -209,6 +209,30 @@ def determineInputsToQuantumNode(self, node): result.add(otherNode) return result + def getSummary(self): + summary = QgraphSummary( + graphID="1712445133.605479-3902002", + cmdLine="mock_pipetask -a 1 -b 2 -c 3 4 5 6", + pipeBaseVersion="1.1.1", + creationUTC="", + inputCollection=["mock_input"], + outputCollection="mock_output", + outputRun="mock_run", + ) + for q in self: + qts = summary.qgraphTaskSummaries.setdefault( + q.taskDef.label, QgraphTaskSummary(taskLabel=q.taskDef.label) + ) + qts.numQuanta += 1 + + for k in ["in1", "in2", "in3"]: + qts.numInputs[k] += 1 + + for k in ["out1", "out2", "out3"]: + qts.numOutputs[k] += 1 + + return summary + class TaskMockMP: """Simple mock class for task supporting multiprocessing.""" diff --git a/tests/test_reports.py b/tests/test_reports.py index b704d7e2..7748688d 100644 --- a/tests/test_reports.py +++ b/tests/test_reports.py @@ -28,6 +28,7 @@ import unittest from lsst.ctrl.mpexec import ExecutionStatus, QuantumReport, Report +from lsst.pipe.base import QgraphSummary class ReportsTestCase(unittest.TestCase): @@ -74,7 +75,7 @@ def test_quantumReport(self): def test_report(self): """Test for Report class.""" - report = Report() + report = Report(qgraphSummary=QgraphSummary(graphID="uuid")) self.assertEqual(report.status, ExecutionStatus.SUCCESS) self.assertIsNotNone(report.cmdLine) self.assertIsNone(report.exitCode) @@ -86,7 +87,9 @@ def test_report(self): qr = QuantumReport.from_exception( exception=RuntimeError("runtime error"), dataId=dataId, taskLabel=taskLabel ) - report = Report(status=ExecutionStatus.FAILURE, exitCode=-1) + report = Report( + status=ExecutionStatus.FAILURE, exitCode=-1, qgraphSummary=QgraphSummary(graphID="uuid") + ) report.set_exception(RuntimeError("runtime error")) report.quantaReports.append(qr) self.assertEqual(report.status, ExecutionStatus.FAILURE) @@ -103,7 +106,9 @@ def test_json(self): qr = QuantumReport.from_exception( exception=RuntimeError("runtime error"), dataId=dataId, taskLabel=taskLabel ) - report = Report(status=ExecutionStatus.FAILURE, exitCode=-1) + report = Report( + status=ExecutionStatus.FAILURE, exitCode=-1, qgraphSummary=QgraphSummary(graphID="uuid") + ) report.set_exception(RuntimeError("runtime error")) report.quantaReports.append(qr) json = report.model_dump_json(exclude_none=True, indent=2)