Skip to content

Commit

Permalink
Clean up documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
eigerx committed Sep 21, 2024
1 parent 2adfc32 commit 853b1da
Showing 1 changed file with 75 additions and 40 deletions.
115 changes: 75 additions & 40 deletions python/lsst/pipe/base/quantum_provenance_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,13 @@ class QuantumInfoStatus(Enum):
proceed with processing.
Currently, a quantum enters a wonky state for one of three reasons:
- Its `QuantumInfoStatus` exits a successful state. Something that
initially succeeded fails on
initially succeeded fails on subsequent attempts.
- A `QuantumRun` is missing logs.
- There are multiple runs associated with a dataset which comes up in a
findFirst search. This means that a dataset which will be used as an
input data product for further processing has heterogeneous inputs,
which may have had different inputs or a different data-query.
- There are multiple runs associated with a dataset, and this comes up
in a findFirst search. This means that a dataset which will be used
as an input data product for further processing has heterogeneous
inputs, which may have had different inputs or a different
data-query.
FAILED = -2: These quanta were attempted and failed. Failed quanta have
logs and no metadata.
UNKNOWN = -1: These are quanta which do not have any metadata associated
Expand Down Expand Up @@ -345,7 +346,7 @@ class UnsuccessfulQuantumSummary(pydantic.BaseModel):
"""
messages: list[str]
"""Any messages associated with the unsuccessful quantum (any clues as to
why the quantum may be in a `failed` or `wonky` state).
why the quantum may be in a `FAILED` or `WONKY` state).
"""

@classmethod
Expand Down Expand Up @@ -398,17 +399,17 @@ def n_failed(self) -> int:

failed_quanta: list[UnsuccessfulQuantumSummary] = pydantic.Field(default_factory=list)
"""A list of all `UnsuccessfulQuantumSummary` objects associated with the
`failed` quanta. This is a report containing their data IDs, the status
FAILED quanta. This is a report containing their data IDs, the status
of each run associated with each `failed` quantum, and the error messages
associated with the failures when applicable.
"""
recovered_quanta: list[dict[str, DataIdValue]] = pydantic.Field(default_factory=list)
"""A list of the quanta which moved from an unsuccessful to `successful`
"""A list of the quanta which moved from an unsuccessful to SUCCESSFUL
state.
"""
wonky_quanta: list[UnsuccessfulQuantumSummary] = pydantic.Field(default_factory=list)
"""A list of all `UnsuccessfulQuantumSummary` objects associated with the
`wonky` quanta. This is a report containing their data_ids, the status of
WONKY quanta. This is a report containing their data_ids, the status of
each run associated with each `wonky` quantum, and messages (dictated in
this module) associated with the particular issue identified.
"""
Expand Down Expand Up @@ -468,22 +469,22 @@ def add_quantum_info(self, info: QuantumInfo, butler: Butler, do_store_logs: boo


class CursedDatasetSummary(pydantic.BaseModel):
"""A summary of all the relevant information on a `cursed` dataset."""
"""A summary of all the relevant information on a cursed dataset."""

producer_data_id: dict[str, DataIdValue]
"""The data_id of the task which produced this dataset. This is mostly
useful for people wishing to track down the task which produced this
`cursed` dataset quickly.
cursed dataset quickly.
"""
data_id: dict[str, DataIdValue]
"""The data_id of the cursed `Dataset`.
"""The data_id of the cursed dataset.
"""
runs_produced: dict[str, bool]
"""A dictionary of all the runs associated with the `cursed` dataset;
"""A dictionary of all the runs associated with the cursed dataset;
the `bool` is true if the dataset was produced in the associated run.
"""
run_visible: str | None
"""A dictionary of all `visible` runs containing the `cursed` dataset.
"""A dictionary of all `visible` runs containing the cursed dataset.
"""
messages: list[str]
"""Any diagnostic messages (dictated in this module) which might help in
Expand Down Expand Up @@ -556,7 +557,7 @@ def n_unsuccessful(self) -> int:

cursed_datasets: list[CursedDatasetSummary] = pydantic.Field(default_factory=list)
"""A list of all `CursedDatasetSummary` objects associated with the
`cursed` datasets. This is a report containing their data_ids and the
cursed datasets. This is a report containing their data_ids and the
data_ids of their producer task, the status of each run associated with
each `cursed` dataset, and messages (dictated in this module) associated
with the particular issue identified.
Expand All @@ -579,7 +580,7 @@ def add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> Non
producer_info : `QuantumInfo`
The `QuantumInfo` object associated with the producer of the
dataset. This is used to report the producer task in the
summaries for `cursed` datasets, which may help identify
summaries for cursed datasets, which may help identify
specific issues.
"""
match info["status"]:
Expand All @@ -599,7 +600,7 @@ def add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> Non

class Summary(pydantic.BaseModel):
"""A summary of the contents of the QuantumProvenanceGraph, including
all information on the quanta for each `Task` and the datasets of each
all information on the quanta for each task and the datasets of each
`DatasetType`.
"""

Expand All @@ -618,11 +619,14 @@ class QuantumProvenanceGraph:
Step through all the quantum graphs associated with certain tasks or
processing steps. For each graph/attempt, the status of each quantum and
dataset is recorded in `QuantumProvenanceGraph.add_new_graph` and outcomes
of quanta over multiple runs are resolved in
`QuantumProvenanceGraph.resolve_duplicates`. At the end of this process,
we can combine all attempts into a summary. This serves to answer the
question "What happened to this data ID?" in a wholistic sense.
dataset is recorded in `QuantumProvenanceGraph.__add_new_graph` and
outcomes of quanta over multiple runs are resolved in
`QuantumProvenanceGraph.__resolve_duplicates`. These can be called outside
the class in the correct order by
`QuantumProvenanceGraph.assemble_quantum_provenance_graph`. At the end of
this process, we can combine all attempts into a summary using the
`QuantumProvenanceGraph.to_summary` method. This serves to answer the
question 'What happened to this data ID?' in a wholistic sense.
"""

def __init__(self) -> None:
Expand Down Expand Up @@ -671,15 +675,15 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp
`DatasetRun` and `QuantumRun`). For each new quantum, annotate
the status of the `QuantumRun` by inspecting the graph. If a
DatasetType was produced, annotate this in the run by setting
`DatasetRun.produced = True`. If a quantum is given `blocked`
or `failed` status, annotate all their successors in the graph
as `blocked`. For each new quantum, use the transition between
`DatasetRun.produced = True`. If a quantum is given BLOCKED
or FAILED status, annotate all their successors in the graph
as BLOCKED. For each new quantum, use the transition between
the current and last `QuantumRun.status` to determine the status
to assign to the overall `QuantumInfo`. For example, if a
previous run associated with a quantum had the status `failed`,
and the status from the new graph reads `successful`, we can
mark the overall quantum status as `successful` and list the data_id
as `recovered`.
previous run associated with a quantum had the status FAILED,
and the status from the new graph reads SUCCESSFUL, we can
mark the overall quantum status as SUCCESSFUL and list the data_id
as RECOVERED.
Parameters
----------
Expand Down Expand Up @@ -780,7 +784,6 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp
else:
quantum_run.status = QuantumRunStatus.LOGS_MISSING
# missing metadata means that the task did not finish.

else:
# if we have logs and no metadata, the task not finishing is
# a failure in the task itself. This includes all payload
Expand Down Expand Up @@ -809,10 +812,10 @@ def __add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExp
last_status = quantum_info["status"]
new_status: QuantumInfoStatus
match last_status, quantum_run.status:
# A quantum can never escape a `wonky` state.
# A quantum can never escape a WONKY state.
case (QuantumInfoStatus.WONKY, _):
new_status = QuantumInfoStatus.WONKY
# Any transition to a success (excluding from `wonky`) is
# Any transition to a success (excluding from WONKY) is
# a success; any transition from a failed state is also a
# recovery.
case (_, QuantumRunStatus.SUCCESSFUL):
Expand Down Expand Up @@ -871,13 +874,13 @@ def __resolve_duplicates(
to the `QuantumProvenanceGraph, resolve any discrepancies between
them and use all attempts to finalize overall status.
Particularly, use the publish state of each `DatasetRun` in combination
with overall quantum status to ascertain the status of each dataset.
Particularly, use the state of each `DatasetRun` in combination with
overall quantum status to ascertain the status of each dataset.
Additionally, if there are multiple visible runs associated with a
dataset, mark the producer quantum as `wonky`.
dataset, mark the producer quantum as WONKY.
This method should be called after
`QuantumProvenanceGraph.add_new_graph` has been called on every graph
`QuantumProvenanceGraph.__add_new_graph` has been called on every graph
associated with the data processing.
Parameters
Expand All @@ -894,12 +897,12 @@ def __resolve_duplicates(
A "where" string to use to constrain the collections, if passed.
curse_failed_logs : `bool`
Mark log datasets as `cursed` if they are visible in the final
Mark log datasets as CURSED if they are visible in the final
output collection. Note that a campaign-level collection must be
used here for `collections` if `curse_failed_logs` is `True`; if
`resolve_duplicates` is run on a list of group-level collections
then each will show logs from their own failures as visible
the datasets will show as cursed regardless of this flag.
`__resolve_duplicates` is run on a list of group-level collections
then each will only show log datasets from their own failures as
visible and datasets from others will be marked as cursed.
"""
# First thing: raise an error if resolve_duplicates has been run
# before on this qpg.
Expand Down Expand Up @@ -1008,6 +1011,35 @@ def assemble_quantum_provenance_graph(
where: str = "",
curse_failed_logs: bool = False,
) -> None:
"""Assemble the quantum provenance graph from a list of all graphs
corresponding to processing attempts.
This method calls the private method `__add_new_graph` on each of the
constituent graphs, verifying that the graphs have been passed in
order. After `__add_new_graph` has been called on all graphs in the
`Sequence`, the method calls `__resolve_duplicates`.
Parameters
----------
butler : `lsst.daf.butler.Butler`
The Butler used for this report. This should match the Butler used
for the run associated with the executed quantum graph.
qgraphs : `Sequence`[`QuantumGraph` | `ResourcePathExpression`]
A list of either quantum graph objects or their uri's, to be used
to assemble the `QuantumProvenanceGraph`.
collections : `Sequence[str]` | `None`
Collections to use in `lsst.daf.butler.registry.queryDatasets` if
paring down the query would be useful.
where : `str`
A "where" string to use to constrain the collections, if passed.
curse_failed_logs : `bool`
Mark log datasets as CURSED if they are visible in the final
output collection. Note that a campaign-level collection must be
used here for `collections` if `curse_failed_logs` is `True`; if
`__resolve_duplicates` is run on a list of group-level collections
then each will only show log datasets from their own failures as
visible and datasets from others will be marked as cursed.
"""
output_runs = []
for count, graph in enumerate(qgraphs):
qgraph = graph if isinstance(graph, QuantumGraph) else QuantumGraph.loadUri(graph)
Expand All @@ -1033,6 +1065,9 @@ def assemble_quantum_provenance_graph(
# If the user has not passed a `collections` variable
if not collections:
collections = list(reversed(output_runs))
assert (
not curse_failed_logs
), "curse_failed_logs option must be used with one campaign-level collection."
self.__resolve_duplicates(butler, collections, where, curse_failed_logs)

def to_summary(self, butler: Butler, do_store_logs: bool = True) -> Summary:
Expand Down

0 comments on commit 853b1da

Please sign in to comment.