From b7b3cc38b57a784502c687b74c6492b375891355 Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Fri, 23 Jun 2023 12:34:39 -0400 Subject: [PATCH 1/9] Work to shrink in-memory qgraph size This cleans up a few data structures to make them smaller in memory, and makes use of new caching infrastructure in daf_butler. --- python/lsst/pipe/base/graph/_loadHelpers.py | 9 +++++++-- python/lsst/pipe/base/graph/_versionDeserializers.py | 2 ++ python/lsst/pipe/base/graph/quantumNode.py | 7 ++++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/python/lsst/pipe/base/graph/_loadHelpers.py b/python/lsst/pipe/base/graph/_loadHelpers.py index 5d112d2a8..4f0d3e2cc 100644 --- a/python/lsst/pipe/base/graph/_loadHelpers.py +++ b/python/lsst/pipe/base/graph/_loadHelpers.py @@ -31,9 +31,10 @@ from typing import TYPE_CHECKING, BinaryIO from uuid import UUID -from lsst.daf.butler import DimensionUniverse +from lsst.daf.butler import DimensionUniverse, PersistenceContextVars from lsst.resources import ResourceHandleProtocol, ResourcePath + if TYPE_CHECKING: from ._versionDeserializers import DeserializerBase from .graph import QuantumGraph @@ -219,7 +220,11 @@ def load( _readBytes = self._readBytes if universe is None: universe = headerInfo.universe - return self.deserializer.constructGraph(nodeSet, _readBytes, universe) + # use the daf butler context vars to aid in ensuring deduplication in + # object instantiation. + runner = PersistenceContextVars() + graph = runner.run(self.deserializer.constructGraph, nodeSet, _readBytes, universe) + return graph # type: ignore def _readBytes(self, start: int, stop: int) -> bytes: """Load the specified byte range from the ResourcePath object diff --git a/python/lsst/pipe/base/graph/_versionDeserializers.py b/python/lsst/pipe/base/graph/_versionDeserializers.py index 8ea984637..8b98f214c 100644 --- a/python/lsst/pipe/base/graph/_versionDeserializers.py +++ b/python/lsst/pipe/base/graph/_versionDeserializers.py @@ -557,6 +557,8 @@ def constructGraph( # Turn the json back into the pydandtic model nodeDeserialized = SerializedQuantumNode.direct(**dump) + del dump + # attach the dictionary of dimension records to the pydantic model # these are stored separately because the are stored over and over # and this saves a lot of space and time. diff --git a/python/lsst/pipe/base/graph/quantumNode.py b/python/lsst/pipe/base/graph/quantumNode.py index 2c3c96067..a64ce6861 100644 --- a/python/lsst/pipe/base/graph/quantumNode.py +++ b/python/lsst/pipe/base/graph/quantumNode.py @@ -96,6 +96,8 @@ class QuantumNode: creation. """ + __slots__ = ("quantum", "taskDef", "nodeId", "_precomputedHash") + def __post_init__(self) -> None: # use setattr here to preserve the frozenness of the QuantumNode self._precomputedHash: int @@ -144,6 +146,9 @@ def from_simple( ) +_fields_set = {"quantum", "taskLabel", "nodeId"} + + class SerializedQuantumNode(BaseModel): quantum: SerializedQuantum taskLabel: str @@ -156,5 +161,5 @@ def direct(cls, *, quantum: dict[str, Any], taskLabel: str, nodeId: str) -> Seri setter(node, "quantum", SerializedQuantum.direct(**quantum)) setter(node, "taskLabel", taskLabel) setter(node, "nodeId", uuid.UUID(nodeId)) - setter(node, "__fields_set__", {"quantum", "taskLabel", "nodeId"}) + setter(node, "__fields_set__", _fields_set) return node From 1b8f6865cd23f27d00193d510c7876340504fbf8 Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Tue, 27 Jun 2023 13:35:07 -0400 Subject: [PATCH 2/9] Use ensure_iterable instead of isinstance --- python/lsst/pipe/base/tests/mocks/_pipeline_task.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/lsst/pipe/base/tests/mocks/_pipeline_task.py b/python/lsst/pipe/base/tests/mocks/_pipeline_task.py index 61edcd24c..514f0a046 100644 --- a/python/lsst/pipe/base/tests/mocks/_pipeline_task.py +++ b/python/lsst/pipe/base/tests/mocks/_pipeline_task.py @@ -283,9 +283,7 @@ def runQuantum( # store mock outputs for name, refs in outputRefs: - if not isinstance(refs, list): - refs = [refs] - for ref in refs: + for ref in ensure_iterable(refs): output = MockDataset( ref=ref.to_simple(), quantum=mock_dataset_quantum, output_connection_name=name ) From d5cad8607c718658abb27d922386a793c0846329 Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Tue, 27 Jun 2023 13:35:42 -0400 Subject: [PATCH 3/9] Check for more container types in execution butler --- python/lsst/pipe/base/executionButlerBuilder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/lsst/pipe/base/executionButlerBuilder.py b/python/lsst/pipe/base/executionButlerBuilder.py index 638dc4e53..84204cae8 100644 --- a/python/lsst/pipe/base/executionButlerBuilder.py +++ b/python/lsst/pipe/base/executionButlerBuilder.py @@ -162,7 +162,7 @@ def _accumulate( for type, refs in attr.items(): # This if block is because init inputs has a different # signature for its items - if not isinstance(refs, list): + if not isinstance(refs, (list, tuple)): refs = [refs] for ref in refs: if ref.isComponent(): @@ -177,7 +177,7 @@ def _accumulate( attr = getattr(quantum, attrName) for type, refs in attr.items(): - if not isinstance(refs, list): + if not isinstance(refs, (list, tuple)): refs = [refs] if type.component() is not None: type = type.makeCompositeDatasetType() From a9fc5e0694d378173891081783760813b5b21fe3 Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Tue, 27 Jun 2023 15:50:42 -0400 Subject: [PATCH 4/9] Deprecate old caching mechanism --- python/lsst/pipe/base/graph/quantumNode.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/python/lsst/pipe/base/graph/quantumNode.py b/python/lsst/pipe/base/graph/quantumNode.py index a64ce6861..2979e3f18 100644 --- a/python/lsst/pipe/base/graph/quantumNode.py +++ b/python/lsst/pipe/base/graph/quantumNode.py @@ -25,6 +25,7 @@ import uuid from dataclasses import dataclass from typing import Any, NewType +import warnings from lsst.daf.butler import ( DatasetRef, @@ -137,10 +138,13 @@ def from_simple( universe: DimensionUniverse, recontitutedDimensions: dict[int, tuple[str, DimensionRecord]] | None = None, ) -> QuantumNode: + if recontitutedDimensions is not None: + warnings.warn( + "The recontitutedDimensions argument is now ignored and may be removed after v 27", + category=DeprecationWarning, + ) return QuantumNode( - quantum=Quantum.from_simple( - simple.quantum, universe, reconstitutedDimensions=recontitutedDimensions - ), + quantum=Quantum.from_simple(simple.quantum, universe), taskDef=taskDefMap[simple.taskLabel], nodeId=simple.nodeId, ) From 11053ded81705d5ff8046c1d4098ab076cc87dfb Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Tue, 27 Jun 2023 16:32:06 -0400 Subject: [PATCH 5/9] Add release notes --- doc/changes/DM-39582.api.md | 1 + doc/changes/DM-39582.feature.md | 3 +++ 2 files changed, 4 insertions(+) create mode 100644 doc/changes/DM-39582.api.md create mode 100644 doc/changes/DM-39582.feature.md diff --git a/doc/changes/DM-39582.api.md b/doc/changes/DM-39582.api.md new file mode 100644 index 000000000..e871ba4e9 --- /dev/null +++ b/doc/changes/DM-39582.api.md @@ -0,0 +1 @@ +Deprecated reconstituteDimensions argument from QuantumNode.from_simple diff --git a/doc/changes/DM-39582.feature.md b/doc/changes/DM-39582.feature.md new file mode 100644 index 000000000..5c56c3a0b --- /dev/null +++ b/doc/changes/DM-39582.feature.md @@ -0,0 +1,3 @@ +The back-end to quantum graph loading has been optimized such that duplicate objects are not created in +memory, but create shared references. This results in a large decrease in memory usage, and decrease in load +times. From fc3e73fcba5f5c8519566287d3df0f1d3f3391f5 Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Tue, 4 Jul 2023 08:57:47 -0400 Subject: [PATCH 6/9] Address review comments and mypy fixes --- doc/changes/DM-39582.api.md | 1 - doc/changes/DM-39582.removal.md | 1 + python/lsst/pipe/base/_instrument.py | 2 +- python/lsst/pipe/base/_quantumContext.py | 10 +++---- python/lsst/pipe/base/graph/_loadHelpers.py | 3 +- python/lsst/pipe/base/graph/graph.py | 33 +++++---------------- python/lsst/pipe/base/graph/quantumNode.py | 6 ++-- python/lsst/pipe/base/pipeline.py | 8 ++--- python/lsst/pipe/base/tests/simpleQGraph.py | 2 +- 9 files changed, 25 insertions(+), 41 deletions(-) delete mode 100644 doc/changes/DM-39582.api.md create mode 100644 doc/changes/DM-39582.removal.md diff --git a/doc/changes/DM-39582.api.md b/doc/changes/DM-39582.api.md deleted file mode 100644 index e871ba4e9..000000000 --- a/doc/changes/DM-39582.api.md +++ /dev/null @@ -1 +0,0 @@ -Deprecated reconstituteDimensions argument from QuantumNode.from_simple diff --git a/doc/changes/DM-39582.removal.md b/doc/changes/DM-39582.removal.md new file mode 100644 index 000000000..b489f3bbd --- /dev/null +++ b/doc/changes/DM-39582.removal.md @@ -0,0 +1 @@ +Deprecated reconstituteDimensions argument from `QuantumNode.from_simple` diff --git a/python/lsst/pipe/base/_instrument.py b/python/lsst/pipe/base/_instrument.py index 002a85759..856ce75b4 100644 --- a/python/lsst/pipe/base/_instrument.py +++ b/python/lsst/pipe/base/_instrument.py @@ -668,7 +668,7 @@ class _DummyConfig(Config): config = _DummyConfig() - return config.packer.apply(data_id, is_exposure=is_exposure) + return config.packer.apply(data_id, is_exposure=is_exposure) # type: ignore @staticmethod @final diff --git a/python/lsst/pipe/base/_quantumContext.py b/python/lsst/pipe/base/_quantumContext.py index 62c8df008..f5a1fc318 100644 --- a/python/lsst/pipe/base/_quantumContext.py +++ b/python/lsst/pipe/base/_quantumContext.py @@ -270,7 +270,7 @@ def get( n_connections = len(dataset) n_retrieved = 0 for i, (name, ref) in enumerate(dataset): - if isinstance(ref, list): + if isinstance(ref, (list, tuple)): val = [] n_refs = len(ref) for j, r in enumerate(ref): @@ -301,7 +301,7 @@ def get( "Completed retrieval of %d datasets from %d connections", n_retrieved, n_connections ) return retVal - elif isinstance(dataset, list): + elif isinstance(dataset, (list, tuple)): n_datasets = len(dataset) retrieved = [] for i, x in enumerate(dataset): @@ -363,14 +363,14 @@ def put( ) for name, refs in dataset: valuesAttribute = getattr(values, name) - if isinstance(refs, list): + if isinstance(refs, (list, tuple)): if len(refs) != len(valuesAttribute): raise ValueError(f"There must be a object to put for every Dataset ref in {name}") for i, ref in enumerate(refs): self._put(valuesAttribute[i], ref) else: self._put(valuesAttribute, refs) - elif isinstance(dataset, list): + elif isinstance(dataset, (list, tuple)): if not isinstance(values, Sequence): raise ValueError("Values to put must be a sequence") if len(dataset) != len(values): @@ -401,7 +401,7 @@ def _checkMembership(self, ref: list[DatasetRef] | DatasetRef, inout: set) -> No which may be important for Quanta with lots of `~lsst.daf.butler.DatasetRef`. """ - if not isinstance(ref, list): + if not isinstance(ref, (list, tuple)): ref = [ref] for r in ref: if (r.datasetType, r.dataId) not in inout: diff --git a/python/lsst/pipe/base/graph/_loadHelpers.py b/python/lsst/pipe/base/graph/_loadHelpers.py index 4f0d3e2cc..0190842f5 100644 --- a/python/lsst/pipe/base/graph/_loadHelpers.py +++ b/python/lsst/pipe/base/graph/_loadHelpers.py @@ -34,7 +34,6 @@ from lsst.daf.butler import DimensionUniverse, PersistenceContextVars from lsst.resources import ResourceHandleProtocol, ResourcePath - if TYPE_CHECKING: from ._versionDeserializers import DeserializerBase from .graph import QuantumGraph @@ -224,7 +223,7 @@ def load( # object instantiation. runner = PersistenceContextVars() graph = runner.run(self.deserializer.constructGraph, nodeSet, _readBytes, universe) - return graph # type: ignore + return graph def _readBytes(self, start: int, stop: int) -> bytes: """Load the specified byte range from the ResourcePath object diff --git a/python/lsst/pipe/base/graph/graph.py b/python/lsst/pipe/base/graph/graph.py index 9fb294a6c..8f0bbb314 100644 --- a/python/lsst/pipe/base/graph/graph.py +++ b/python/lsst/pipe/base/graph/graph.py @@ -1276,49 +1276,32 @@ def updateRun(self, run: str, *, metadata_key: str | None = None, update_graph_i update_graph_id : `bool`, optional If `True` then also update graph ID with a new unique value. """ - dataset_id_map = {} - def _update_output_refs_in_place(refs: list[DatasetRef], run: str) -> None: + def _update_refs_in_place(refs: list[DatasetRef], run: str) -> None: """Update list of `~lsst.daf.butler.DatasetRef` with new run and dataset IDs. """ - new_refs = [] for ref in refs: - new_ref = DatasetRef(ref.datasetType, ref.dataId, run=run, conform=False) - dataset_id_map[ref.id] = new_ref.id - new_refs.append(new_ref) - refs[:] = new_refs - - def _update_input_refs_in_place(refs: list[DatasetRef], run: str) -> None: - """Update list of `~lsst.daf.butler.DatasetRef` with IDs from - dataset_id_map. - """ - new_refs = [] - for ref in refs: - if (new_id := dataset_id_map.get(ref.id)) is not None: - new_ref = DatasetRef(ref.datasetType, ref.dataId, id=new_id, run=run, conform=False) - new_refs.append(new_ref) - else: - new_refs.append(ref) - refs[:] = new_refs + # hack the run to be replaced explicitly + object.__setattr__(ref, "run", run) # Loop through all outputs and update their datasets. for node in self._connectedQuanta: for refs in node.quantum.outputs.values(): - _update_output_refs_in_place(refs, run) + _update_refs_in_place(refs, run) for refs in self._initOutputRefs.values(): - _update_output_refs_in_place(refs, run) + _update_refs_in_place(refs, run) - _update_output_refs_in_place(self._globalInitOutputRefs, run) + _update_refs_in_place(self._globalInitOutputRefs, run) # Update all intermediates from their matching outputs. for node in self._connectedQuanta: for refs in node.quantum.inputs.values(): - _update_input_refs_in_place(refs, run) + _update_refs_in_place(refs, run) for refs in self._initInputRefs.values(): - _update_input_refs_in_place(refs, run) + _update_refs_in_place(refs, run) if update_graph_id: self._buildId = BuildId(f"{time.time()}-{os.getpid()}") diff --git a/python/lsst/pipe/base/graph/quantumNode.py b/python/lsst/pipe/base/graph/quantumNode.py index 2979e3f18..b5df5d015 100644 --- a/python/lsst/pipe/base/graph/quantumNode.py +++ b/python/lsst/pipe/base/graph/quantumNode.py @@ -23,9 +23,9 @@ __all__ = ("QuantumNode", "NodeId", "BuildId") import uuid +import warnings from dataclasses import dataclass from typing import Any, NewType -import warnings from lsst.daf.butler import ( DatasetRef, @@ -35,6 +35,7 @@ Quantum, SerializedQuantum, ) +from lsst.utils.introspection import find_outside_stacklevel from pydantic import BaseModel from ..pipeline import TaskDef @@ -141,7 +142,8 @@ def from_simple( if recontitutedDimensions is not None: warnings.warn( "The recontitutedDimensions argument is now ignored and may be removed after v 27", - category=DeprecationWarning, + category=FutureWarning, + stacklevel=find_outside_stacklevel("lsst.pipe.base"), ) return QuantumNode( quantum=Quantum.from_simple(simple.quantum, universe), diff --git a/python/lsst/pipe/base/pipeline.py b/python/lsst/pipe/base/pipeline.py index 781defcca..48f3ad05c 100644 --- a/python/lsst/pipe/base/pipeline.py +++ b/python/lsst/pipe/base/pipeline.py @@ -192,7 +192,7 @@ def logOutputDatasetName(self) -> str | None: """Name of a dataset type for log output from this task, `None` if logs are not to be saved (`str`) """ - if cast(PipelineTaskConfig, self.config).saveLogOutput: + if self.config.saveLogOutput: return acc.LOG_OUTPUT_TEMPLATE.format(label=self.label) else: return None @@ -623,7 +623,7 @@ def get_data_id(self, universe: DimensionUniverse) -> DataCoordinate: """ instrument_class_name = self._pipelineIR.instrument if instrument_class_name is not None: - instrument_class = doImportType(instrument_class_name) + instrument_class = cast(PipeBaseInstrument, doImportType(instrument_class_name)) if instrument_class is not None: return DataCoordinate.standardize(instrument=instrument_class.getName(), universe=universe) return DataCoordinate.makeEmpty(universe) @@ -654,8 +654,8 @@ def addTask(self, task: type[PipelineTask] | str, label: str) -> None: # be defined without label which is not acceptable, use task # _DefaultName in that case if isinstance(task, str): - task_class = doImportType(task) - label = task_class._DefaultName + task_class = cast(PipelineTask, doImportType(task)) + label = task_class._DefaultName self._pipelineIR.tasks[label] = pipelineIR.TaskIR(label, taskName) def removeTask(self, label: str) -> None: diff --git a/python/lsst/pipe/base/tests/simpleQGraph.py b/python/lsst/pipe/base/tests/simpleQGraph.py index 8b50bf42a..6a5289d4e 100644 --- a/python/lsst/pipe/base/tests/simpleQGraph.py +++ b/python/lsst/pipe/base/tests/simpleQGraph.py @@ -307,7 +307,7 @@ def populateButler( instrument = pipeline.getInstrument() if instrument is not None: instrument_class = doImportType(instrument) - instrumentName = instrument_class.getName() + instrumentName = cast(Instrument, instrument_class).getName() instrumentClass = get_full_type_name(instrument_class) else: instrumentName = "INSTR" From 628e539721bc8d3be785070523e566dc3a5fb79c Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Tue, 4 Jul 2023 08:58:14 -0400 Subject: [PATCH 7/9] Pin pydantic to less than 2 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 882f21640..79d5616fa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ pyyaml >= 5.1 -pydantic +pydantic < 2 numpy >= 1.17 networkx frozendict From 4f276c94eceec1566d9a74cc5ab217307b9d6ea8 Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Tue, 4 Jul 2023 09:18:12 -0400 Subject: [PATCH 8/9] MyPy fixes and a small bug fix Fixing mypy annotations revealed a small bug in adjustQuantum with dataset type names, this commit also fixes that. --- python/lsst/pipe/base/connections.py | 13 +++++++------ python/lsst/pipe/base/script/transfer_from_graph.py | 4 ++-- python/lsst/pipe/base/tests/util.py | 10 +++++----- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/python/lsst/pipe/base/connections.py b/python/lsst/pipe/base/connections.py index e02d99e2d..556cfa011 100644 --- a/python/lsst/pipe/base/connections.py +++ b/python/lsst/pipe/base/connections.py @@ -934,12 +934,12 @@ class AdjustQuantumHelper: connection-oriented mappings used inside `PipelineTaskConnections`. """ - inputs: NamedKeyMapping[DatasetType, list[DatasetRef]] + inputs: NamedKeyMapping[DatasetType, tuple[DatasetRef]] """Mapping of regular input and prerequisite input datasets, grouped by `~lsst.daf.butler.DatasetType`. """ - outputs: NamedKeyMapping[DatasetType, list[DatasetRef]] + outputs: NamedKeyMapping[DatasetType, tuple[DatasetRef]] """Mapping of output datasets, grouped by `~lsst.daf.butler.DatasetType`. """ @@ -997,7 +997,7 @@ def adjust_in_place( # Translate adjustments to DatasetType-keyed, Quantum-oriented form, # installing new mappings in self if necessary. if adjusted_inputs_by_connection: - adjusted_inputs = NamedKeyDict[DatasetType, list[DatasetRef]](self.inputs) + adjusted_inputs = NamedKeyDict[DatasetType, tuple[DatasetRef]](self.inputs) for name, (connection, updated_refs) in adjusted_inputs_by_connection.items(): dataset_type_name = connection.name if not set(updated_refs).issubset(self.inputs[dataset_type_name]): @@ -1006,21 +1006,22 @@ def adjust_in_place( f"({dataset_type_name}) input datasets that are not a subset of those " f"it was given for data ID {data_id}." ) - adjusted_inputs[dataset_type_name] = list(updated_refs) + adjusted_inputs[dataset_type_name] = tuple(updated_refs) self.inputs = adjusted_inputs.freeze() self.inputs_adjusted = True else: self.inputs_adjusted = False if adjusted_outputs_by_connection: - adjusted_outputs = NamedKeyDict[DatasetType, list[DatasetRef]](self.outputs) + adjusted_outputs = NamedKeyDict[DatasetType, tuple[DatasetRef]](self.outputs) for name, (connection, updated_refs) in adjusted_outputs_by_connection.items(): + dataset_type_name = connection.name if not set(updated_refs).issubset(self.outputs[dataset_type_name]): raise RuntimeError( f"adjustQuantum implementation for task with label {label} returned {name} " f"({dataset_type_name}) output datasets that are not a subset of those " f"it was given for data ID {data_id}." ) - adjusted_outputs[dataset_type_name] = list(updated_refs) + adjusted_outputs[dataset_type_name] = tuple(updated_refs) self.outputs = adjusted_outputs.freeze() self.outputs_adjusted = True else: diff --git a/python/lsst/pipe/base/script/transfer_from_graph.py b/python/lsst/pipe/base/script/transfer_from_graph.py index 547885ae8..ad1a25495 100644 --- a/python/lsst/pipe/base/script/transfer_from_graph.py +++ b/python/lsst/pipe/base/script/transfer_from_graph.py @@ -66,8 +66,8 @@ def transfer_from_graph( if refs := qgraph.initOutputRefs(task_def): original_output_refs.update(refs) for qnode in qgraph: - for refs in qnode.quantum.outputs.values(): - original_output_refs.update(refs) + for otherRefs in qnode.quantum.outputs.values(): + original_output_refs.update(otherRefs) # Get data repository definitions from the QuantumGraph; these can have # different storage classes than those in the quanta. diff --git a/python/lsst/pipe/base/tests/util.py b/python/lsst/pipe/base/tests/util.py index 832cd91c0..7d9aa2b5f 100644 --- a/python/lsst/pipe/base/tests/util.py +++ b/python/lsst/pipe/base/tests/util.py @@ -46,8 +46,8 @@ def check_output_run(graph: QuantumGraph, run: str) -> list[DatasetRef]: the specified run. """ # Collect all inputs/outputs, so that we can build intermediate refs. - output_refs = [] - input_refs = [] + output_refs: list[DatasetRef] = [] + input_refs: list[DatasetRef] = [] for node in graph: for refs in node.quantum.outputs.values(): output_refs += refs @@ -61,10 +61,10 @@ def check_output_run(graph: QuantumGraph, run: str) -> list[DatasetRef]: if init_refs: input_refs += init_refs output_refs += graph.globalInitOutputRefs() - refs = [ref for ref in output_refs if ref.run != run] + newRefs = [ref for ref in output_refs if ref.run != run] output_ids = {ref.id for ref in output_refs} intermediates = [ref for ref in input_refs if ref.id in output_ids] - refs += [ref for ref in intermediates if ref.run != run] + newRefs += [ref for ref in intermediates if ref.run != run] - return refs + return newRefs From 44f5f6c72abb2a641a3d2efa40bb684f75d3cf44 Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Tue, 4 Jul 2023 09:33:56 -0400 Subject: [PATCH 9/9] More typing changes --- python/lsst/pipe/base/connections.py | 10 +++++----- python/lsst/pipe/base/graph/_implDetails.py | 6 ++++-- python/lsst/pipe/base/graphBuilder.py | 5 +++-- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/python/lsst/pipe/base/connections.py b/python/lsst/pipe/base/connections.py index 556cfa011..3d54150f1 100644 --- a/python/lsst/pipe/base/connections.py +++ b/python/lsst/pipe/base/connections.py @@ -39,7 +39,7 @@ import itertools import string from collections import UserDict -from collections.abc import Collection, Generator, Iterable, Mapping, Set +from collections.abc import Collection, Generator, Iterable, Mapping, Sequence, Set from dataclasses import dataclass from types import MappingProxyType, SimpleNamespace from typing import TYPE_CHECKING, Any @@ -934,12 +934,12 @@ class AdjustQuantumHelper: connection-oriented mappings used inside `PipelineTaskConnections`. """ - inputs: NamedKeyMapping[DatasetType, tuple[DatasetRef]] + inputs: NamedKeyMapping[DatasetType, Sequence[DatasetRef]] """Mapping of regular input and prerequisite input datasets, grouped by `~lsst.daf.butler.DatasetType`. """ - outputs: NamedKeyMapping[DatasetType, tuple[DatasetRef]] + outputs: NamedKeyMapping[DatasetType, Sequence[DatasetRef]] """Mapping of output datasets, grouped by `~lsst.daf.butler.DatasetType`. """ @@ -997,7 +997,7 @@ def adjust_in_place( # Translate adjustments to DatasetType-keyed, Quantum-oriented form, # installing new mappings in self if necessary. if adjusted_inputs_by_connection: - adjusted_inputs = NamedKeyDict[DatasetType, tuple[DatasetRef]](self.inputs) + adjusted_inputs = NamedKeyDict[DatasetType, tuple[DatasetRef, ...]](self.inputs) for name, (connection, updated_refs) in adjusted_inputs_by_connection.items(): dataset_type_name = connection.name if not set(updated_refs).issubset(self.inputs[dataset_type_name]): @@ -1012,7 +1012,7 @@ def adjust_in_place( else: self.inputs_adjusted = False if adjusted_outputs_by_connection: - adjusted_outputs = NamedKeyDict[DatasetType, tuple[DatasetRef]](self.outputs) + adjusted_outputs = NamedKeyDict[DatasetType, tuple[DatasetRef, ...]](self.outputs) for name, (connection, updated_refs) in adjusted_outputs_by_connection.items(): dataset_type_name = connection.name if not set(updated_refs).issubset(self.outputs[dataset_type_name]): diff --git a/python/lsst/pipe/base/graph/_implDetails.py b/python/lsst/pipe/base/graph/_implDetails.py index 3f27faec9..6472ab066 100644 --- a/python/lsst/pipe/base/graph/_implDetails.py +++ b/python/lsst/pipe/base/graph/_implDetails.py @@ -313,13 +313,15 @@ def _pruner( # from the graph. try: helper.adjust_in_place(node.taskDef.connections, node.taskDef.label, node.quantum.dataId) + # ignore the types because quantum really can take a sequence + # of inputs newQuantum = Quantum( taskName=node.quantum.taskName, taskClass=node.quantum.taskClass, dataId=node.quantum.dataId, initInputs=node.quantum.initInputs, - inputs=helper.inputs, - outputs=helper.outputs, + inputs=helper.inputs, # type: ignore + outputs=helper.outputs, # type: ignore ) # If the inputs or outputs were adjusted to something different # than what was supplied by the graph builder, dissassociate diff --git a/python/lsst/pipe/base/graphBuilder.py b/python/lsst/pipe/base/graphBuilder.py index c9ee29784..fb5620f0f 100644 --- a/python/lsst/pipe/base/graphBuilder.py +++ b/python/lsst/pipe/base/graphBuilder.py @@ -455,13 +455,14 @@ def makeQuantum(self, datastore_records: Mapping[str, DatastoreRecordData] | Non matching_records = records.subset(input_ids) if matching_records is not None: quantum_records[datastore_name] = matching_records + # ignore the types because quantum really can take a sequence of inputs return Quantum( taskName=self.task.taskDef.taskName, taskClass=self.task.taskDef.taskClass, dataId=self.dataId, initInputs=initInputs, - inputs=helper.inputs, - outputs=helper.outputs, + inputs=helper.inputs, # type: ignore + outputs=helper.outputs, # type: ignore datastore_records=quantum_records, )