Skip to content

Commit

Permalink
Update checkExistingOutputs to work with a LimitedButler
Browse files Browse the repository at this point in the history
  • Loading branch information
mfisherlevine committed Mar 15, 2024
1 parent a2f6597 commit dc4445a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
20 changes: 9 additions & 11 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
If only partial outputs exist then they are removed if
``clobberOutputs`` is True, otherwise an exception is raised.
The ``LimitedButler`` is used for everything, and should be set to
``self.butler`` if no separate ``LimitedButler`` is available.
Parameters
----------
quantum : `~lsst.daf.butler.Quantum`
Expand All @@ -312,10 +315,6 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
RuntimeError
Raised if some outputs exist and some not.
"""
if not self.butler:
# Skip/prune logic only works for full butler.
return False

if self.skipExisting:
_LOG.debug(
"Checking existence of metadata from previous execution of label=%s dataId=%s.",
Expand All @@ -333,7 +332,7 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
_LOG.debug(
"Looking for existing outputs in the way for label=%s dataId=%s.", taskDef.label, quantum.dataId
)
ref_dict = self.butler.stored_many(chain.from_iterable(quantum.outputs.values()))
ref_dict = limited_butler.stored_many(chain.from_iterable(quantum.outputs.values()))
existingRefs = [ref for ref, exists in ref_dict.items() if exists]
missingRefs = [ref for ref, exists in ref_dict.items() if not exists]
if existingRefs:
Expand All @@ -343,31 +342,30 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
return True
elif self.clobberOutputs:
_LOG.info("Removing complete outputs for quantum %s: %s", quantum, existingRefs)
self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
else:
raise RuntimeError(
f"Complete outputs exists for a quantum {quantum} "
"and neither clobberOutputs nor skipExisting is set: "
f"collection={self.butler.run} existingRefs={existingRefs}"
f"existingRefs={existingRefs}"
)
else:
# Partial outputs from a failed quantum.
_LOG.debug(
"Partial outputs exist for quantum %s collection=%s existingRefs=%s missingRefs=%s",
"Partial outputs exist for quantum %s existingRefs=%s missingRefs=%s",
quantum,
self.butler.run,
existingRefs,
missingRefs,
)
if self.clobberOutputs:
# only prune
_LOG.info("Removing partial outputs for task %s: %s", taskDef, existingRefs)
self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
return False
else:
raise RuntimeError(
"Registry inconsistency while checking for existing quantum outputs:"
f" quantum={quantum} collection={self.butler.run} existingRefs={existingRefs}"
f" quantum={quantum} existingRefs={existingRefs}"
f" missingRefs={missingRefs}"
)

Expand Down
4 changes: 4 additions & 0 deletions run_err.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- task0:
data_id:
detector: 0
instrument: INSTR

0 comments on commit dc4445a

Please sign in to comment.