From 7937a63f778591b04f215b1bc68439a70530d250 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Fri, 13 Sep 2024 17:06:57 -0700 Subject: [PATCH] WIP: make add_new_graph and resolve_duplicates private methods and require user to pass graphs in order --- .../pipe/base/quantum_provenance_graph.py | 36 +++++++++++++++++-- tests/test_quantum_provenance_graph.py | 3 +- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index 4545f728..7a9c4127 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -634,7 +634,7 @@ def get_dataset_info(self, key: DatasetKey) -> DatasetInfo: """ return self._xgraph.nodes[key] - def add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpression) -> None: + def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpression) -> None: """Add a new quantum graph to the `QuantumProvenanceGraph`. Step through the quantum graph. Annotate a `networkx.DiGraph` @@ -831,7 +831,7 @@ def add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpre # Update `QuantumInfo.status` for this quantum. quantum_info["status"] = new_status - def resolve_duplicates( + def __resolve_duplicates( self, butler: Butler, collections: Sequence[str] | None = None, @@ -971,6 +971,38 @@ def resolve_duplicates( # self._finalized = True so that it cannot be run again. self._finalized = True + def assemble_quantum_provenance_graph( + self, + butler: Butler, + qgraphs: Sequence[QuantumGraph | ResourcePathExpression], + collections: Sequence[str] | None = None, + where: str = "", + curse_failed_logs: bool = False, + ) -> None: + output_runs = [] + # if not isinstance(qgraphs, Sequence): + # qgraphs = list(qgraphs) + for qgraph in qgraphs: + # for count, qgraph in enumerate(qgraphs): + # If the most recent graph's timestamp was earlier than any of the + # previous graphs, raise a RuntimeError. + # if len(qgraphs) > 1: + # breakpoint() + # for graph in qgraphs[: count - 1]: + # if qgraph.metadata["time"] < graph.metadata["time"]: + # raise RuntimeError( + # """add_new_graph may only be called on graphs + # which are passed in the order they were + # created. Please call again, passing your + # graphs in order.""" + # ) + self.__add_new_graph(butler, qgraph) + output_runs.append(qgraph.metadata["output_run"]) + # If the user has not passed a `collections` variable + if not collections: + collections = list(reversed(output_runs)) + self.__resolve_duplicates(butler, collections, where, curse_failed_logs) + def to_summary(self, butler: Butler, do_store_logs: bool = True) -> Summary: """Summarize the `QuantumProvenanceGraph`. diff --git a/tests/test_quantum_provenance_graph.py b/tests/test_quantum_provenance_graph.py index eccda207..2d031c90 100644 --- a/tests/test_quantum_provenance_graph.py +++ b/tests/test_quantum_provenance_graph.py @@ -54,8 +54,7 @@ def test_qpg_reports(self) -> None: # make a simple qgraph to make an execution report on butler, qgraph = simpleQGraph.makeSimpleQGraph(root=root) qpg = QuantumProvenanceGraph() - qpg.add_new_graph(butler, qgraph) - qpg.resolve_duplicates(butler) + qpg.assemble_quantum_provenance_graph(butler, qgraph) summary = qpg.to_summary(butler) for task_summary in summary.tasks.values():