Skip to content

Commit

Permalink
WIP: make add_new_graph and resolve_duplicates private methods and re…
Browse files Browse the repository at this point in the history
…quire user to pass graphs in order
  • Loading branch information
eigerx committed Sep 16, 2024
1 parent 13b6372 commit 7937a63
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
36 changes: 34 additions & 2 deletions python/lsst/pipe/base/quantum_provenance_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ def get_dataset_info(self, key: DatasetKey) -> DatasetInfo:
"""
return self._xgraph.nodes[key]

def add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpression) -> None:
def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpression) -> None:
"""Add a new quantum graph to the `QuantumProvenanceGraph`.
Step through the quantum graph. Annotate a `networkx.DiGraph`
Expand Down Expand Up @@ -831,7 +831,7 @@ def add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpre
# Update `QuantumInfo.status` for this quantum.
quantum_info["status"] = new_status

def resolve_duplicates(
def __resolve_duplicates(
self,
butler: Butler,
collections: Sequence[str] | None = None,
Expand Down Expand Up @@ -971,6 +971,38 @@ def resolve_duplicates(
# self._finalized = True so that it cannot be run again.
self._finalized = True

def assemble_quantum_provenance_graph(
self,
butler: Butler,
qgraphs: Sequence[QuantumGraph | ResourcePathExpression],
collections: Sequence[str] | None = None,
where: str = "",
curse_failed_logs: bool = False,
) -> None:
output_runs = []
# if not isinstance(qgraphs, Sequence):
# qgraphs = list(qgraphs)
for qgraph in qgraphs:
# for count, qgraph in enumerate(qgraphs):
# If the most recent graph's timestamp was earlier than any of the
# previous graphs, raise a RuntimeError.
# if len(qgraphs) > 1:
# breakpoint()
# for graph in qgraphs[: count - 1]:
# if qgraph.metadata["time"] < graph.metadata["time"]:
# raise RuntimeError(
# """add_new_graph may only be called on graphs
# which are passed in the order they were
# created. Please call again, passing your
# graphs in order."""
# )
self.__add_new_graph(butler, qgraph)
output_runs.append(qgraph.metadata["output_run"])
# If the user has not passed a `collections` variable
if not collections:
collections = list(reversed(output_runs))
self.__resolve_duplicates(butler, collections, where, curse_failed_logs)

def to_summary(self, butler: Butler, do_store_logs: bool = True) -> Summary:
"""Summarize the `QuantumProvenanceGraph`.
Expand Down
3 changes: 1 addition & 2 deletions tests/test_quantum_provenance_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ def test_qpg_reports(self) -> None:
# make a simple qgraph to make an execution report on
butler, qgraph = simpleQGraph.makeSimpleQGraph(root=root)
qpg = QuantumProvenanceGraph()
qpg.add_new_graph(butler, qgraph)
qpg.resolve_duplicates(butler)
qpg.assemble_quantum_provenance_graph(butler, qgraph)
summary = qpg.to_summary(butler)

for task_summary in summary.tasks.values():
Expand Down

0 comments on commit 7937a63

Please sign in to comment.