diff --git a/doc/changes/DM-41711.feature.md b/doc/changes/DM-41711.feature.md index 91dd2309..10c3ef1f 100644 --- a/doc/changes/DM-41711.feature.md +++ b/doc/changes/DM-41711.feature.md @@ -1,3 +1,11 @@ Create a QuantumProvenanceGraph, which details the status of every quantum and dataset over multiple attempts at executing graphs, noting when quanta have been recovered. + +Step through all the quantum graphs associated with certain tasks or +processing steps. For each graph/attempt, the status of each quantum and +dataset is recorded in `QuantumProvenanceGraph.add_new_graph` and outcomes +of quanta over multiple runs are resolved in +`QuantumProvenanceGraph.resolve_duplicates`. At the end of this process, +we can combine all attempts into a summary. This serves to answer the +question "What happened to this data ID?" in a wholistic sense. diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index 242df310..2b08416a 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -97,6 +97,9 @@ class DatasetKey(NamedTuple): """ is_prerequisite: ClassVar[Literal[False]] = False + """Whether this node is a prerequisite to another node (also always + `False`). + """ class PrerequisiteDatasetKey(NamedTuple): @@ -122,6 +125,8 @@ class PrerequisiteDatasetKey(NamedTuple): """ is_prerequisite: ClassVar[Literal[True]] = True + """Whether this node is a prerequisite to another node (always `True`). + """ QuantumRunStatus: TypeAlias = Literal["failed", "successful", "logs_missing", "blocked", "metadata_missing"] @@ -279,13 +284,13 @@ class TaskSummary(pydantic.BaseModel): """The number of quanta expected by the graph. """ - @pydantic.computed_field # type: ignore[misc] + @pydantic.computed_field # type: ignore[prop-decorator] @property def n_wonky(self) -> int: """Return a count of `wonky` quanta.""" return len(self.wonky_quanta) - @pydantic.computed_field # type: ignore[misc] + @pydantic.computed_field # type: ignore[prop-decorator] @property def n_failed(self) -> int: """Return a count of `failed` quanta.""" @@ -417,8 +422,8 @@ class DatasetTypeSummary(pydantic.BaseModel): """ n_published: int = 0 - """A count of the datasets of this type which were published in the final - collection. + """A count of the datasets of this type which were published in the + finalized collection(s). """ n_unpublished: int = 0 """A count of the datasets of this type which were produced but not @@ -435,13 +440,13 @@ class DatasetTypeSummary(pydantic.BaseModel): """The number of datasets of this type expected by the graph. """ - @pydantic.computed_field # type: ignore[misc] + @pydantic.computed_field # type: ignore[prop-decorator] @property def n_cursed(self) -> int: """Return a count of cursed datasets.""" return len(self.cursed_datasets) - @pydantic.computed_field # type: ignore[misc] + @pydantic.computed_field # type: ignore[prop-decorator] @property def n_unsuccessful(self) -> int: """Return a count of unsuccessful datasets.""" @@ -528,7 +533,7 @@ def __init__(self) -> None: # name. self._datasets: dict[str, set[DatasetKey]] = {} # Bool representing whether the graph has been finalized. This is set - # to True when resolve_duplicates is + # to True when resolve_duplicates completes. self._finalized: bool = False def get_quantum_info(self, key: QuantumKey) -> QuantumInfo: @@ -571,7 +576,7 @@ def add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpre to assign to the overall `QuantumInfo`. For example, if a previous run associated with a quantum had the status `failed`, and the status from the new graph reads `successful`, we can - mark the overall quantum status as `successful` and list the id + mark the overall quantum status as `successful` and list the data_id as `recovered`. Parameters @@ -750,7 +755,11 @@ def add_new_graph(self, butler: Butler, qgraph: QuantumGraph | ResourcePathExpre quantum_info["status"] = new_status def resolve_duplicates( - self, butler: Butler, collections: Sequence[str] | None = None, where: str = "" + self, + butler: Butler, + collections: Sequence[str] | None = None, + where: str = "", + curse_failed_logs: bool = False, ) -> None: """After quantum graphs associated with each run have been added to the `QuantumProvenanceGraph, resolve any discrepancies between @@ -777,6 +786,14 @@ def resolve_duplicates( where : `str` A "where" string to use to constrain the collections, if passed. + + curse_failed_logs : `bool` + Mark log datasets as `cursed` if they are published in the final + output collection. Note that a campaign-level collection must be + used here for `collections` if `curse_failed_logs` is `True`; if + `resolve_duplicates` is run on a list of group-level collections + then each will show logs from their own failures as published + the datasets will show as cursed regardless of this flag. """ # First thing: raise an error if resolve_duplicates has been run # before on this qpg. @@ -839,11 +856,18 @@ def resolve_duplicates( # a published dataset, that dataset is cursed. Set the # status for the dataset to cursed and note the reason # for labeling the dataset as cursed. - case (_, "published") if not dataset_type_name.endswith("_log"): - dataset_info["status"] = "cursed" - dataset_info["messages"].append( - "Published dataset is from an unsuccessful quantum." - ) + case (_, "published"): + # Avoiding publishing failed logs is difficult + # without using tagged collections, so flag them as + # merely unsuccessful unless the user requests it. + if dataset_type_name.endswith("_log") and not curse_failed_logs: + dataset_info["status"] = "unsuccessful" + else: + dataset_info["status"] = "cursed" + dataset_info["messages"].append( + f"Unsuccessful dataset {dataset_type_name} published in " + "final output collection." + ) # any other produced dataset (produced but not # published and not successful) is a regular # failure.