Skip to content

Commit

Permalink
Clean up and document
Browse files Browse the repository at this point in the history
  • Loading branch information
eigerx committed Oct 15, 2024
1 parent 735294a commit aa98b41
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 15 deletions.
6 changes: 6 additions & 0 deletions doc/changes/DM-41605.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Aggregate multiple `pipetask report` outputs into one wholistic `Summary`.

While the `QuantumProvenanceGraph` was designed to resolve processing over
dataquery-identified groups, `pipetask aggregate-reports` is designed to
combine multiple group-level reports into one which totals the successes,
issues and failures over the same section of pipeline.
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ def report(
" also printed to the screen when using the --full-output-filename option.",
)
def aggregate_reports(
filenames: Iterable[str], full_output_filename: str | None, brief: bool = False
filenames: Sequence[str], full_output_filename: str | None, brief: bool = False
) -> None:
"""Aggregate pipetask report output on disjoint data-id groups into one
Summary over common tasks and datasets. Intended for use when the same
Expand Down
31 changes: 23 additions & 8 deletions python/lsst/ctrl/mpexec/cli/script/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pprint
from collections.abc import Iterable, Sequence
from typing import Any
from collections.abc import Sequence

from astropy.table import Table
from lsst.daf.butler import Butler
Expand Down Expand Up @@ -196,18 +195,34 @@ def report_v2(


def aggregate_reports(
filenames: Iterable[str], full_output_filename: str | None, brief: bool = False
filenames: Sequence[str], full_output_filename: str | None, brief: bool = False
) -> None:
"""Docstring.
"""Aggregrate multiple `QuantumProvenanceGraph` summaries on separate
dataquery-identified groups into one wholistic report. This is intended for
reports over the same tasks in the same pipeline, after `pipetask report`
has been resolved over all graphs associated with each group.
open a bunch of json files, call model_validate_json, call aggregrate,
print summary
Parameters
----------
filenames : `Sequence[str]`
The paths to the JSON files produced by `pipetask report` (note: this
is only compatible with the multi-graph or `--force-v2` option). These
files correspond to the `QuantumProvenanceGraph.Summary` objects which
are produced for each group.
full_output_filename : `str | None`
The name of the JSON file in which to store the aggregate report, if
passed. This is passed to `print_summary` at the end of this function.
brief : `bool = False`
Only display short (counts-only) summary on stdout. This includes
counts and not error messages or data_ids (similar to BPS report).
This option will still report all `cursed` datasets and `wonky`
quanta. This is passed to `print_summary` at the end of this function.
"""
summaries: Iterable[Summary] = []
summaries: list[Summary] = []
for filename in filenames:
with open(filename) as f:
model = Summary.model_validate_json(f.read())
summaries.append(model)
summaries.extend([model])
result = Summary.aggregate(summaries)
print_summary(result, full_output_filename, brief)

Expand Down
12 changes: 6 additions & 6 deletions tests/test_cliCmdReport.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def test_report(self):
for task_summary in model.tasks.values():
self.assertEqual(task_summary.n_successful, 0)
self.assertEqual(task_summary.n_blocked, 0)
self.assertEqual(task_summary.n_not_attempted, 1)
self.assertEqual(task_summary.n_unknown, 1)
self.assertEqual(task_summary.n_expected, 1)
self.assertListEqual(task_summary.failed_quanta, [])
self.assertListEqual(task_summary.recovered_quanta, [])
Expand All @@ -243,8 +243,8 @@ def test_report(self):
dataset_type_summary.unsuccessful_datasets,
[{"instrument": "INSTR", "detector": 0}],
)
self.assertEqual(dataset_type_summary.n_published, 0)
self.assertEqual(dataset_type_summary.n_unpublished, 0)
self.assertEqual(dataset_type_summary.n_visible, 0)
self.assertEqual(dataset_type_summary.n_shadowed, 0)
self.assertEqual(dataset_type_summary.n_predicted_only, 0)
self.assertEqual(dataset_type_summary.n_expected, 1)
self.assertEqual(dataset_type_summary.n_cursed, 0)
Expand Down Expand Up @@ -327,7 +327,7 @@ def test_aggregate_reports(self):
for task_label, task_summary in agg_sum.tasks.items():
self.assertEqual(task_summary.n_successful, 0)
self.assertEqual(task_summary.n_blocked, 0)
self.assertEqual(task_summary.n_not_attempted, 2)
self.assertEqual(task_summary.n_unknown, 2)
self.assertEqual(task_summary.n_expected, 2)
self.assertListEqual(task_summary.failed_quanta, [])
self.assertListEqual(task_summary.recovered_quanta, [])
Expand All @@ -339,8 +339,8 @@ def test_aggregate_reports(self):
dataset_type_summary.unsuccessful_datasets,
[{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}],
)
self.assertEqual(dataset_type_summary.n_published, 0)
self.assertEqual(dataset_type_summary.n_unpublished, 0)
self.assertEqual(dataset_type_summary.n_visible, 0)
self.assertEqual(dataset_type_summary.n_shadowed, 0)
self.assertEqual(dataset_type_summary.n_predicted_only, 0)
self.assertEqual(dataset_type_summary.n_expected, 2)
self.assertEqual(dataset_type_summary.n_cursed, 0)
Expand Down

0 comments on commit aa98b41

Please sign in to comment.