From ffe88a98625bb8b95c13c5288c7981dde6e80ff6 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Tue, 10 Sep 2024 15:49:21 -0700 Subject: [PATCH] Change published to visible and unpublished to shadowed. --- .../pipe/base/quantum_provenance_graph.py | 98 ++++++++++--------- tests/test_quantum_provenance_graph.py | 6 +- 2 files changed, 53 insertions(+), 51 deletions(-) diff --git a/python/lsst/pipe/base/quantum_provenance_graph.py b/python/lsst/pipe/base/quantum_provenance_graph.py index ebdd1df1..4545f728 100644 --- a/python/lsst/pipe/base/quantum_provenance_graph.py +++ b/python/lsst/pipe/base/quantum_provenance_graph.py @@ -242,20 +242,22 @@ class DatasetRun(pydantic.BaseModel): """Whether the specific run produced the dataset. """ - published: bool = False - """Whether this dataset was published in the final output collection. + visible: bool = False + """Whether this dataset is visible in the final output collection; in other + words, whether this dataset is queryable in a find-first search. This + determines whether it will be used as an input to further processing. """ @pydantic.model_validator(mode="after") def _validate(self) -> DatasetRun: - """Validate the model for `DatasetRun` by asserting that no published + """Validate the model for `DatasetRun` by asserting that no visible `DatasetRun` is also not produced (this should be impossible). """ - assert not (self.published and not self.produced) + assert not (self.visible and not self.produced) return self -DatasetInfoStatus: TypeAlias = Literal["published", "unpublished", "predicted_only", "unsuccessful", "cursed"] +DatasetInfoStatus: TypeAlias = Literal["visible", "shadowed", "predicted_only", "unsuccessful", "cursed"] class DatasetInfo(TypedDict): @@ -277,21 +279,21 @@ class DatasetInfo(TypedDict): Possible Statuses ----------------- - `published`: The dataset is queryable in a find_first search. This means + `visible`: The dataset is queryable in a find_first search. This means that it can be used as an input by subsequent tasks and processing. - `unpublished`: The dataset exists but is not queryable in a find_first + `shadowed`: The dataset exists but is not queryable in a find_first search. This could mean that the version of this dataset which is passed as an input to further processing is not in the collections - given. An `unpublished` dataset will not be used as an input to further + given. A `shadowed` dataset will not be used as an input to further processing. - `predicted_only`: The dataset was predicted, and was not published in any + `predicted_only`: The dataset was predicted, and was not visible in any run, but was the successor of a successful quantum. These datasets are the result of pipelines `NoWorkFound` cases, in which a dataset is predicted in the graph but found to not be necessary in processing. `unsuccessful`: The dataset was not produced. These are the results of failed or blocked quanta. `cursed`: The dataset was the result of an unsuccessful quantum and was - published in the output collection anyway. These are flagged as + visible in the output collection anyway. These are flagged as `cursed` so that they may be caught before they become inputs to further processing. """ @@ -454,8 +456,8 @@ class CursedDatasetSummary(pydantic.BaseModel): """A dictionary of all the runs associated with the `cursed` dataset; the `bool` is true if the dataset was produced in the associated run. """ - run_published: str | None - """A dictionary of all `published` runs containing the `cursed` dataset. + run_visible: str | None + """A dictionary of all `visible` runs containing the `cursed` dataset. """ messages: list[str] """Any diagnostic messages (dictated in this module) which might help in @@ -475,13 +477,13 @@ def from_info(cls, info: DatasetInfo, producer_info: QuantumInfo) -> CursedDatas All relevant information on the producer task. This is used to report the data_id of the producer task. """ - runs_published = {k for k, v in info["runs"].items() if v.published} + runs_visible = {k for k, v in info["runs"].items() if v.visible} return cls( producer_data_id=dict(producer_info["data_id"].required), data_id=dict(info["data_id"].required), runs_produced={k: v.produced for k, v in info["runs"].items()}, # this has at most one element - run_published=runs_published.pop() if runs_published else None, + run_visible=runs_visible.pop() if runs_visible else None, messages=info["messages"], ) @@ -495,13 +497,13 @@ class DatasetTypeSummary(pydantic.BaseModel): """The name of the task which produced this dataset. """ - n_published: int = 0 - """A count of the datasets of this type which were published in the + n_visible: int = 0 + """A count of the datasets of this type which were visible in the finalized collection(s). """ - n_unpublished: int = 0 + n_shadowed: int = 0 """A count of the datasets of this type which were produced but not - published. This includes any datasets which do not come up in a butler + visible. This includes any datasets which do not come up in a butler query over their associated collection. """ n_predicted_only: int = 0 @@ -555,10 +557,10 @@ def add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> Non specific issues. """ match info["status"]: - case "published": - self.n_published += 1 - case "unpublished": - self.n_unpublished += 1 + case "visible": + self.n_visible += 1 + case "shadowed": + self.n_shadowed += 1 case "unsuccessful": self.unsuccessful_datasets.append(dict(info["data_id"].mapping)) case "cursed": @@ -842,7 +844,7 @@ def resolve_duplicates( Particularly, use the publish state of each `DatasetRun` in combination with overall quantum status to ascertain the status of each dataset. - Additionally, if there are multiple published runs associated with a + Additionally, if there are multiple visible runs associated with a dataset, mark the producer quantum as `wonky`. This method should be called after @@ -863,11 +865,11 @@ def resolve_duplicates( A "where" string to use to constrain the collections, if passed. curse_failed_logs : `bool` - Mark log datasets as `cursed` if they are published in the final + Mark log datasets as `cursed` if they are visible in the final output collection. Note that a campaign-level collection must be used here for `collections` if `curse_failed_logs` is `True`; if `resolve_duplicates` is run on a list of group-level collections - then each will show logs from their own failures as published + then each will show logs from their own failures as visible the datasets will show as cursed regardless of this flag. """ # First thing: raise an error if resolve_duplicates has been run @@ -894,49 +896,49 @@ def resolve_duplicates( # graphs given. except KeyError: continue - # queryable datasets are `published`. - dataset_info["runs"][ref.run].published = True + # queryable datasets are `visible`. + dataset_info["runs"][ref.run].visible = True for task_quanta in self._quanta.values(): for quantum_key in task_quanta: - # runs associated with published datasets. - published_runs: set[str] = set() + # runs associated with visible datasets. + visible_runs: set[str] = set() quantum_info = self.get_quantum_info(quantum_key) # Loop over each dataset in the outputs of a single quantum. for dataset_key in self.iter_outputs_of(quantum_key): dataset_info = self.get_dataset_info(dataset_key) - published_runs.update( - run for run, dataset_run in dataset_info["runs"].items() if dataset_run.published + visible_runs.update( + run for run, dataset_run in dataset_info["runs"].items() if dataset_run.visible ) - if any(dataset_run.published for dataset_run in dataset_info["runs"].values()): - publish_state = "published" - # set the publish state to `unpublished` if the dataset was - # produced but not published (i.e., not queryable from the + if any(dataset_run.visible for dataset_run in dataset_info["runs"].values()): + publish_state = "visible" + # set the publish state to `shadowed` if the dataset was + # produced but not visible (i.e., not queryable from the # final collection(s)). elif any(dataset_run.produced for dataset_run in dataset_info["runs"].values()): - publish_state = "unpublished" - # a dataset which was not produced and not published is + publish_state = "shadowed" + # a dataset which was not produced and not visible is # missing. else: publish_state = "missing" # use the quantum status and publish state to ascertain the # status of the dataset. match (quantum_info["status"], publish_state): - # published datasets from successful quanta are as + # visible datasets from successful quanta are as # intended. - case ("successful", "published"): - dataset_info["status"] = "published" + case ("successful", "visible"): + dataset_info["status"] = "visible" # missing datasets from successful quanta indicate a # `NoWorkFound` case. case ("successful", "missing"): dataset_info["status"] = "predicted_only" - case ("successful", "unpublished"): - dataset_info["status"] = "unpublished" + case ("successful", "shadowed"): + dataset_info["status"] = "shadowed" # If anything other than a successful quantum produces - # a published dataset, that dataset is cursed. Set the + # a visible dataset, that dataset is cursed. Set the # status for the dataset to cursed and note the reason # for labeling the dataset as cursed. - case (_, "published"): + case (_, "visible"): # Avoiding publishing failed logs is difficult # without using tagged collections, so flag them as # merely unsuccessful unless the user requests it. @@ -945,18 +947,18 @@ def resolve_duplicates( else: dataset_info["status"] = "cursed" dataset_info["messages"].append( - f"Unsuccessful dataset {dataset_type_name} published in " + f"Unsuccessful dataset {dataset_type_name} visible in " "final output collection." ) # any other produced dataset (produced but not - # published and not successful) is a regular + # visible and not successful) is a regular # failure. case _: dataset_info["status"] = "unsuccessful" - if len(published_runs) > 1: + if len(visible_runs) > 1: quantum_info["status"] = "wonky" quantum_info["messages"].append( - f"Outputs from different runs of the same quanta were published: {published_runs}." + f"Outputs from different runs of the same quanta were visible: {visible_runs}." ) for dataset_key in self.iter_outputs_of(quantum_key): dataset_info = self.get_dataset_info(dataset_key) diff --git a/tests/test_quantum_provenance_graph.py b/tests/test_quantum_provenance_graph.py index eca3fcea..eccda207 100644 --- a/tests/test_quantum_provenance_graph.py +++ b/tests/test_quantum_provenance_graph.py @@ -67,7 +67,7 @@ def test_qpg_reports(self) -> None: TaskSummary( n_successful=0, n_blocked=0, - n_not_attempted=1, + n_unknown=1, n_expected=1, failed_quanta=[], recovered_quanta=[], @@ -106,8 +106,8 @@ def test_qpg_reports(self) -> None: # Check dataset counts (can't be done all in one because # datasets have different producers), but all the counts for # each task should be the same. - self.assertEqual(dataset_type_summary.n_published, 0) - self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) self.assertEqual(dataset_type_summary.n_predicted_only, 0) self.assertEqual(dataset_type_summary.n_expected, 1) self.assertEqual(dataset_type_summary.n_cursed, 0)