Skip to content

Commit

Permalink
Add flag for what to do with failed log datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
eigerx committed Aug 1, 2024
1 parent 5f9c5f5 commit 7e142bb
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 14 deletions.
8 changes: 8 additions & 0 deletions doc/changes/DM-41711.feature.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
Create a QuantumProvenanceGraph, which details the status of every quantum
and dataset over multiple attempts at executing graphs, noting when quanta
have been recovered.

Step through all the quantum graphs associated with certain tasks or
processing steps. For each graph/attempt, the status of each quantum and
dataset is recorded in `QuantumProvenanceGraph.add_new_graph` and outcomes
of quanta over multiple runs are resolved in
`QuantumProvenanceGraph.resolve_duplicates`. At the end of this process,
we can combine all attempts into a summary. This serves to answer the
question "What happened to this data ID?" in a wholistic sense.
52 changes: 38 additions & 14 deletions python/lsst/pipe/base/quantum_provenance_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class DatasetKey(NamedTuple):
"""

is_prerequisite: ClassVar[Literal[False]] = False
"""Whether this node is a prerequisite to another node (also always
`False`).
"""


class PrerequisiteDatasetKey(NamedTuple):
Expand All @@ -122,6 +125,8 @@ class PrerequisiteDatasetKey(NamedTuple):
"""

is_prerequisite: ClassVar[Literal[True]] = True
"""Whether this node is a prerequisite to another node (always `True`).
"""


QuantumRunStatus: TypeAlias = Literal["failed", "successful", "logs_missing", "blocked", "metadata_missing"]
Expand Down Expand Up @@ -279,13 +284,13 @@ class TaskSummary(pydantic.BaseModel):
"""The number of quanta expected by the graph.
"""

@pydantic.computed_field # type: ignore[misc]
@pydantic.computed_field # type: ignore[prop-decorator]
@property
def n_wonky(self) -> int:
"""Return a count of `wonky` quanta."""
return len(self.wonky_quanta)

@pydantic.computed_field # type: ignore[misc]
@pydantic.computed_field # type: ignore[prop-decorator]
@property
def n_failed(self) -> int:
"""Return a count of `failed` quanta."""
Expand Down Expand Up @@ -417,8 +422,8 @@ class DatasetTypeSummary(pydantic.BaseModel):
"""

n_published: int = 0
"""A count of the datasets of this type which were published in the final
collection.
"""A count of the datasets of this type which were published in the
finalized collection(s).
"""
n_unpublished: int = 0
"""A count of the datasets of this type which were produced but not
Expand All @@ -435,13 +440,13 @@ class DatasetTypeSummary(pydantic.BaseModel):
"""The number of datasets of this type expected by the graph.
"""

@pydantic.computed_field # type: ignore[misc]
@pydantic.computed_field # type: ignore[prop-decorator]
@property
def n_cursed(self) -> int:
"""Return a count of cursed datasets."""
return len(self.cursed_datasets)

@pydantic.computed_field # type: ignore[misc]
@pydantic.computed_field # type: ignore[prop-decorator]
@property
def n_unsuccessful(self) -> int:
"""Return a count of unsuccessful datasets."""
Expand Down Expand Up @@ -528,7 +533,7 @@ def __init__(self) -> None:
# name.
self._datasets: dict[str, set[DatasetKey]] = {}
# Bool representing whether the graph has been finalized. This is set
# to True when resolve_duplicates is
# to True when resolve_duplicates completes.
self._finalized: bool = False

def get_quantum_info(self, key: QuantumKey) -> QuantumInfo:
Expand Down Expand Up @@ -571,7 +576,7 @@ def add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpre
to assign to the overall `QuantumInfo`. For example, if a
previous run associated with a quantum had the status `failed`,
and the status from the new graph reads `successful`, we can
mark the overall quantum status as `successful` and list the id
mark the overall quantum status as `successful` and list the data_id
as `recovered`.
Parameters
Expand Down Expand Up @@ -750,7 +755,11 @@ def add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpre
quantum_info["status"] = new_status

def resolve_duplicates(
self, butler: Butler, collections: Sequence[str] | None = None, where: str = ""
self,
butler: Butler,
collections: Sequence[str] | None = None,
where: str = "",
curse_failed_logs: bool = False,
) -> None:
"""After quantum graphs associated with each run have been added
to the `QuantumProvenanceGraph, resolve any discrepancies between
Expand All @@ -777,6 +786,14 @@ def resolve_duplicates(
where : `str`
A "where" string to use to constrain the collections, if passed.
curse_failed_logs : `bool`
Mark log datasets as `cursed` if they are published in the final
output collection. Note that a campaign-level collection must be
used here for `collections` if `curse_failed_logs` is `True`; if
`resolve_duplicates` is run on a list of group-level collections
then each will show logs from their own failures as published
the datasets will show as cursed regardless of this flag.
"""
# First thing: raise an error if resolve_duplicates has been run
# before on this qpg.
Expand Down Expand Up @@ -839,11 +856,18 @@ def resolve_duplicates(
# a published dataset, that dataset is cursed. Set the
# status for the dataset to cursed and note the reason
# for labeling the dataset as cursed.
case (_, "published") if not dataset_type_name.endswith("_log"):
dataset_info["status"] = "cursed"
dataset_info["messages"].append(
"Published dataset is from an unsuccessful quantum."
)
case (_, "published"):
# Avoiding publishing failed logs is difficult
# without using tagged collections, so flag them as
# merely unsuccessful unless the user requests it.
if dataset_type_name.endswith("_log") and not curse_failed_logs:
dataset_info["status"] = "unsuccessful"

Check warning on line 864 in python/lsst/pipe/base/quantum_provenance_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/quantum_provenance_graph.py#L864

Added line #L864 was not covered by tests
else:
dataset_info["status"] = "cursed"
dataset_info["messages"].append(

Check warning on line 867 in python/lsst/pipe/base/quantum_provenance_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/quantum_provenance_graph.py#L866-L867

Added lines #L866 - L867 were not covered by tests
f"Unsuccessful dataset {dataset_type_name} published in "
"final output collection."
)
# any other produced dataset (produced but not
# published and not successful) is a regular
# failure.
Expand Down

0 comments on commit 7e142bb

Please sign in to comment.