From cb614fccd10eecf67c984688abde66f022b6e654 Mon Sep 17 00:00:00 2001 From: Merlin Fisher-Levine Date: Mon, 18 Mar 2024 12:04:38 -0700 Subject: [PATCH] Update checkExistingOutputs to work with a LimitedButler --- .../lsst/ctrl/mpexec/singleQuantumExecutor.py | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index c7dbad9d..7d5e0821 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -290,6 +290,9 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle If only partial outputs exist then they are removed if ``clobberOutputs`` is True, otherwise an exception is raised. + The ``LimitedButler`` is used for everything, and should be set to + ``self.butler`` if no separate ``LimitedButler`` is available. + Parameters ---------- quantum : `~lsst.daf.butler.Quantum` @@ -312,10 +315,6 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle RuntimeError Raised if some outputs exist and some not. """ - if not self.butler: - # Skip/prune logic only works for full butler. - return False - if self.skipExisting: _LOG.debug( "Checking existence of metadata from previous execution of label=%s dataId=%s.", @@ -333,7 +332,7 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle _LOG.debug( "Looking for existing outputs in the way for label=%s dataId=%s.", taskDef.label, quantum.dataId ) - ref_dict = self.butler.stored_many(chain.from_iterable(quantum.outputs.values())) + ref_dict = limited_butler.stored_many(chain.from_iterable(quantum.outputs.values())) existingRefs = [ref for ref, exists in ref_dict.items() if exists] missingRefs = [ref for ref, exists in ref_dict.items() if not exists] if existingRefs: @@ -343,31 +342,30 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle return True elif self.clobberOutputs: _LOG.info("Removing complete outputs for quantum %s: %s", quantum, existingRefs) - self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) + limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) else: raise RuntimeError( f"Complete outputs exists for a quantum {quantum} " "and neither clobberOutputs nor skipExisting is set: " - f"collection={self.butler.run} existingRefs={existingRefs}" + f"existingRefs={existingRefs}" ) else: # Partial outputs from a failed quantum. _LOG.debug( - "Partial outputs exist for quantum %s collection=%s existingRefs=%s missingRefs=%s", + "Partial outputs exist for quantum %s existingRefs=%s missingRefs=%s", quantum, - self.butler.run, existingRefs, missingRefs, ) if self.clobberOutputs: # only prune _LOG.info("Removing partial outputs for task %s: %s", taskDef, existingRefs) - self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) + limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) return False else: raise RuntimeError( "Registry inconsistency while checking for existing quantum outputs:" - f" quantum={quantum} collection={self.butler.run} existingRefs={existingRefs}" + f" quantum={quantum} existingRefs={existingRefs}" f" missingRefs={missingRefs}" )