Skip to content

Commit

Permalink
Respond to review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
eigerx committed Sep 21, 2024
1 parent 853b1da commit e5d3d75
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 76 deletions.
131 changes: 83 additions & 48 deletions python/lsst/pipe/base/quantum_provenance_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class QuantumKey(NamedTuple):
class DatasetKey(NamedTuple):
"""Identifier type for dataset keys in a `QuantumProvenanceGraph`."""

parent_dataset_type_name: str
dataset_type_name: str
"""Name of the dataset type (never a component)."""

data_id_values: tuple[DataIdValue, ...]
Expand Down Expand Up @@ -117,7 +117,7 @@ class PrerequisiteDatasetKey(NamedTuple):
dataset ID (UUID) instead.
"""

parent_dataset_type_name: str
dataset_type_name: str
"""Name of the dataset type (never a component)."""

dataset_id_bytes: bytes
Expand Down Expand Up @@ -180,8 +180,9 @@ class QuantumInfoStatus(Enum):
state; it is impossible to exit and requires human intervention to
proceed with processing.
Currently, a quantum enters a wonky state for one of three reasons:
- Its `QuantumInfoStatus` exits a successful state. Something that
initially succeeded fails on subsequent attempts.
- Its overall `QuantumInfoStatus` moves from a successful state (as a
result of a successful run) to any other state. In other words,
something that initially succeeded fails on subsequent attempts.
- A `QuantumRun` is missing logs.
- There are multiple runs associated with a dataset, and this comes up
in a findFirst search. This means that a dataset which will be used
Expand Down Expand Up @@ -258,7 +259,7 @@ class DatasetRun(pydantic.BaseModel):
"""

produced: bool = False
"""Whether the specific run produced the dataset.
"""Whether the specific run wrote the dataset.
"""

visible: bool = False
Expand All @@ -271,13 +272,20 @@ class DatasetRun(pydantic.BaseModel):
def _validate(self) -> DatasetRun:
"""Validate the model for `DatasetRun` by asserting that no visible
`DatasetRun` is also not produced (this should be impossible).
Returns
-------
self : `DatasetRun`
The `DatasetRun` object, validated.
"""
assert not (self.visible and not self.produced)
return self


class DatasetInfoStatus(Enum):
"""Status of the the DatasetType-dataID pair over all runs.
"""Status of the the DatasetType-dataID pair over all runs. This depends
not only on the presence of the dataset itself, but also on metadata, logs
and the state of its producer quantum.
Possible Statuses
-----------------
Expand Down Expand Up @@ -341,12 +349,13 @@ class UnsuccessfulQuantumSummary(pydantic.BaseModel):
"""The data_id of the unsuccessful quantum.
"""
runs: dict[str, str]
"""A dictionary including the enum name of the `QuantumRunStatus` of each
run associated with an attempt to process the unsuccessful quantum.
"""A dictionary (key: output run collection name) with the value of the
enum name of the `QuantumRunStatus` of each run associated with an attempt
to process the unsuccessful quantum.
"""
messages: list[str]
"""Any messages associated with the unsuccessful quantum (any clues as to
why the quantum may be in a `FAILED` or `WONKY` state).
why the quantum may be in a FAILED or WONKY state).
"""

@classmethod
Expand All @@ -358,6 +367,17 @@ def from_info(cls, info: QuantumInfo) -> UnsuccessfulQuantumSummary:
----------
info : `QuantumInfo`
The `QuantumInfo` object for the unsuccessful quantum.
Returns
-------
summary : `UnsuccessfulQuantumSummary`
A Pydantic model containing the dataID, run collection names (and
each of their `QuantumRunStatus` enum names) as well as messages
which may point to any clues about the nature of the problem. For
failed quanta, these are usually error messages from the butler
logs. For wonky quanta, these can be messages generated during the
assembly of the `QuantumProvenanceGraph` that describe why it was
marked as wonky.
"""
return cls(
data_id=dict(info["data_id"].required),
Expand Down Expand Up @@ -404,8 +424,8 @@ def n_failed(self) -> int:
associated with the failures when applicable.
"""
recovered_quanta: list[dict[str, DataIdValue]] = pydantic.Field(default_factory=list)
"""A list of the quanta which moved from an unsuccessful to SUCCESSFUL
state.
"""A list of dataIDs (key->value) which moved from an unsuccessful to
successful state.
"""
wonky_quanta: list[UnsuccessfulQuantumSummary] = pydantic.Field(default_factory=list)
"""A list of all `UnsuccessfulQuantumSummary` objects associated with the
Expand Down Expand Up @@ -450,9 +470,7 @@ def add_quantum_info(self, info: QuantumInfo, butler: Butler, do_store_logs: boo
try:

Check warning on line 470 in python/lsst/pipe/base/quantum_provenance_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/quantum_provenance_graph.py#L470

Added line #L470 was not covered by tests
# should probably upgrade this to use a dataset
# ref
log = butler.get(
log_key.parent_dataset_type_name, info["data_id"], collections=run
)
log = butler.get(log_key.dataset_type_name, info["data_id"], collections=run)

Check warning on line 473 in python/lsst/pipe/base/quantum_provenance_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/quantum_provenance_graph.py#L473

Added line #L473 was not covered by tests
except LookupError:
failed_quantum_summary.messages.append(f"Logs not ingested for {run!r}")
except FileNotFoundError:
Expand Down Expand Up @@ -503,6 +521,17 @@ def from_info(cls, info: DatasetInfo, producer_info: QuantumInfo) -> CursedDatas
producer_info : `QuantumInfo`
All relevant information on the producer task. This is used to
report the data_id of the producer task.
Returns
-------
summary : `CursedDatasetSummary`
A Pydantic model containing the dataID of the task which produced
this cursed dataset, the dataID associated with the cursed dataset,
run collection names (and their `DatasetRun` information) as well
as any messages which may point to any clues about the nature of
the problem. These are be messages generated during the assembly of
the `QuantumProvenanceGraph` that describe why it was marked as
cursed.
"""
runs_visible = {k for k, v in info["runs"].items() if v.visible}
return cls(
Expand Down Expand Up @@ -650,6 +679,12 @@ def get_quantum_info(self, key: QuantumKey) -> QuantumInfo:
----------
key : `QuantumKey`
The key used to refer to the node on the graph.
Returns
-------
quantum_info : `QuantumInfo`
The `TypedDict` with information on the task label-dataID pair
across all runs.
"""
return self._xgraph.nodes[key]

Expand All @@ -661,15 +696,23 @@ def get_dataset_info(self, key: DatasetKey) -> DatasetInfo:
----------
key : `DatasetKey`
The key used to refer to the node on the graph.
Returns
-------
dataset_info : `DatasetInfo`
The `TypedDict` with information about the `DatasetType`-dataID
pair across all runs.
"""
return self._xgraph.nodes[key]

def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpression) -> None:
"""Add a new quantum graph to the `QuantumProvenanceGraph`.
Step through the quantum graph. Annotate a `networkx.DiGraph`
(`QuantumProvenanceGraph._xgraph`) with all of the relevant
information: quanta, dataset types and their associated run
Notes
-----
The algorithm: step through the quantum graph. Annotate a
`networkx.DiGraph` (`QuantumProvenanceGraph._xgraph`) with all of the
relevant information: quanta, dataset types and their associated run
collections (these unique quanta- and dataset type-run
collection combinations are encapsulated in the classes
`DatasetRun` and `QuantumRun`). For each new quantum, annotate
Expand Down Expand Up @@ -702,7 +745,8 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp
output_run = qgraph.metadata["output_run"]
new_quanta = []
for node in qgraph:
# make a key to refer to the quantum and add it to the graph.
# make a key to refer to the quantum and add it to the quantum
# provenance graph.
quantum_key = QuantumKey(
node.taskDef.label, cast(DataCoordinate, node.quantum.dataId).required_values
)
Expand Down Expand Up @@ -737,15 +781,15 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp
dataset_info.setdefault("data_id", ref.dataId)
dataset_info.setdefault("status", DatasetInfoStatus.PREDICTED_ONLY)
dataset_info.setdefault("messages", [])
self._datasets.setdefault(dataset_key.parent_dataset_type_name, set()).add(dataset_key)
self._datasets.setdefault(dataset_key.dataset_type_name, set()).add(dataset_key)
dataset_runs = dataset_info.setdefault("runs", {})
# make a `DatasetRun` for the specific dataset-run
# collection combination.
dataset_runs[output_run] = DatasetRun(id=ref.id)
# save metadata and logs for easier status interpretation later
if dataset_key.parent_dataset_type_name.endswith("_metadata"):
if dataset_key.dataset_type_name.endswith("_metadata"):
quantum_info["metadata"] = dataset_key
if dataset_key.parent_dataset_type_name.endswith("_log"):
if dataset_key.dataset_type_name.endswith("_log"):
quantum_info["log"] = dataset_key
for ref in itertools.chain.from_iterable(node.quantum.inputs.values()):
dataset_key = DatasetKey(ref.datasetType.nameAndComponent()[0], ref.dataId.required_values)
Expand Down Expand Up @@ -776,12 +820,12 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp
# if we also have logs, this is a success.
if log_dataset_run.produced:
quantum_run.status = QuantumRunStatus.SUCCESSFUL

Check warning on line 822 in python/lsst/pipe/base/quantum_provenance_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/quantum_provenance_graph.py#L822

Added line #L822 was not covered by tests
# if we have metadata and no logs, this is a very rare
# case. either the task ran successfully and the datastore
# died immediately afterwards, or some supporting
# infrastructure for transferring the logs to the datastore
# failed.
else:
# if we have metadata and no logs, this is a very rare
# case. either the task ran successfully and the datastore
# died immediately afterwards, or some supporting
# infrastructure for transferring the logs to the datastore
# failed.
quantum_run.status = QuantumRunStatus.LOGS_MISSING

Check warning on line 829 in python/lsst/pipe/base/quantum_provenance_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/quantum_provenance_graph.py#L829

Added line #L829 was not covered by tests
# missing metadata means that the task did not finish.
else:
Expand Down Expand Up @@ -889,7 +933,7 @@ def __resolve_duplicates(
The Butler used for this report. This should match the Butler used
for the run associated with the executed quantum graph.
collections : `Sequence[str]` | `None`
collections : `Sequence` [`str`] | `None`
Collections to use in `lsst.daf.butler.registry.queryDatasets` if
paring down the query would be useful.
Expand Down Expand Up @@ -995,7 +1039,7 @@ def __resolve_duplicates(
for dataset_key in self.iter_outputs_of(quantum_key):
dataset_info = self.get_dataset_info(dataset_key)
quantum_info["messages"].append(

Check warning on line 1041 in python/lsst/pipe/base/quantum_provenance_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/quantum_provenance_graph.py#L1040-L1041

Added lines #L1040 - L1041 were not covered by tests
f"{dataset_key.parent_dataset_type_name}"
f"{dataset_key.dataset_type_name}"
+ f"from {str(dataset_info['runs'])};"
+ f"{str(dataset_info['status'])}"
)
Expand Down Expand Up @@ -1024,10 +1068,10 @@ def assemble_quantum_provenance_graph(
butler : `lsst.daf.butler.Butler`
The Butler used for this report. This should match the Butler used
for the run associated with the executed quantum graph.
qgraphs : `Sequence`[`QuantumGraph` | `ResourcePathExpression`]
qgraphs : `Sequence` [`QuantumGraph` | `ResourcePathExpression`]
A list of either quantum graph objects or their uri's, to be used
to assemble the `QuantumProvenanceGraph`.
collections : `Sequence[str]` | `None`
collections : `Sequence` [`str`] | `None`
Collections to use in `lsst.daf.butler.registry.queryDatasets` if
paring down the query would be useful.
where : `str`
Expand All @@ -1041,29 +1085,15 @@ def assemble_quantum_provenance_graph(
visible and datasets from others will be marked as cursed.
"""
output_runs = []
for count, graph in enumerate(qgraphs):
for graph in qgraphs:
qgraph = graph if isinstance(graph, QuantumGraph) else QuantumGraph.loadUri(graph)
assert qgraph.metadata is not None, "Saved QGs always have metadata."
# If the most recent graph's timestamp was earlier than any of the
# previous graphs, raise a RuntimeError.
if len(qgraphs) > 1:
for previous_graph in qgraphs[: count - 1]:
previous_graph = (
previous_graph
if isinstance(previous_graph, QuantumGraph)
else QuantumGraph.loadUri(previous_graph)
)
if qgraph.metadata["time"] < previous_graph.metadata["time"]:
raise RuntimeError(
"""add_new_graph may only be called on graphs
which are passed in the order they were
created. Please call again, passing your
graphs in order."""
)
self.__add_new_graph(butler, qgraph)
output_runs.append(qgraph.metadata["output_run"])
# If the user has not passed a `collections` variable
if not collections:
# We reverse the order of the associated output runs because the
# query in __resolve_duplicates must be done most recent-first.
collections = list(reversed(output_runs))
assert (
not curse_failed_logs
Expand All @@ -1082,7 +1112,7 @@ def to_summary(self, butler: Butler, do_store_logs: bool = True) -> Summary:
Returns
-------
summary : `Summary`
result : `Summary`
A struct containing counts of quanta and datasets in each of
the overall states defined in `QuantumInfo` and `DatasetInfo`,
as well as diagnostic information and error messages for failed
Expand Down Expand Up @@ -1137,6 +1167,11 @@ def get_producer_of(self, dataset_key: DatasetKey) -> QuantumKey:
----------
dataset_key : `DatasetKey`
The key for the dataset whose producer quantum is needed.
Returns
-------
result : `QuantumKey`
The key for the quantum which produced the dataset.
"""
(result,) = self._xgraph.predecessors(dataset_key)
return result
51 changes: 23 additions & 28 deletions tests/test_quantum_provenance_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import unittest

from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, TaskSummary
from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph
from lsst.pipe.base.tests import simpleQGraph
from lsst.utils.tests import temporaryDirectory

Expand Down Expand Up @@ -61,20 +61,15 @@ def test_qpg_reports(self) -> None:
# We know that we have one expected task that was not run.
# As such, the following dictionary should describe all of
# the mock tasks.
self.assertEqual(
task_summary,
TaskSummary(
n_successful=0,
n_blocked=0,
n_unknown=1,
n_expected=1,
failed_quanta=[],
recovered_quanta=[],
wonky_quanta=[],
n_wonky=0,
n_failed=0,
),
)
self.assertEqual(task_summary.n_successful, 0)
self.assertEqual(task_summary.n_blocked, 0)
self.assertEqual(task_summary.n_unknown, 1)
self.assertEqual(task_summary.n_expected, 1)
self.assertListEqual(task_summary.failed_quanta, [])
self.assertListEqual(task_summary.recovered_quanta, [])
self.assertListEqual(task_summary.wonky_quanta, [])
self.assertEqual(task_summary.n_wonky, 0)
self.assertEqual(task_summary.n_failed, 0)
expected_mock_datasets = [
"add_dataset1",
"add2_dataset1",
Expand Down Expand Up @@ -115,16 +110,16 @@ def test_qpg_reports(self) -> None:
self.assertListEqual(dataset_type_summary.cursed_datasets, [])
# Make sure we have the right datasets based on our mock
self.assertIn(dataset_type_name, expected_mock_datasets)
# Make sure the expected datasets were produced by the expected
# tasks
match dataset_type_name:
case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]:
self.assertEqual(dataset_type_summary.producer, "task0")
case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]:
self.assertEqual(dataset_type_summary.producer, "task1")
case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]:
self.assertEqual(dataset_type_summary.producer, "task2")
case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]:
self.assertEqual(dataset_type_summary.producer, "task3")
case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]:
self.assertEqual(dataset_type_summary.producer, "task4")
# Make sure the expected datasets were produced by the expected
# tasks
match dataset_type_name:
case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]:
self.assertEqual(dataset_type_summary.producer, "task0")
case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]:
self.assertEqual(dataset_type_summary.producer, "task1")
case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]:
self.assertEqual(dataset_type_summary.producer, "task2")
case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]:
self.assertEqual(dataset_type_summary.producer, "task3")
case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]:
self.assertEqual(dataset_type_summary.producer, "task4")

0 comments on commit e5d3d75

Please sign in to comment.