Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-43060: Let checkExistingOutputs work without a full butler #284

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
If only partial outputs exist then they are removed if
``clobberOutputs`` is True, otherwise an exception is raised.

The ``LimitedButler`` is used for everything, and should be set to
``self.butler`` if no separate ``LimitedButler`` is available.

Parameters
----------
quantum : `~lsst.daf.butler.Quantum`
Expand All @@ -312,10 +315,6 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
RuntimeError
Raised if some outputs exist and some not.
"""
if not self.butler:
# Skip/prune logic only works for full butler.
return False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andy-slac, do you recall why we made this shortcut? At first I thought it must be because LimitedButler didn't have pruneDatasets...but it does. So this is either something we should have done a while ago (but maybe thought we didn't need) or there's something deeper.

This is all coming up because @mfisherlevine now has a LimitedButler that wraps a another LimitedButler (which is actually a full one in this use case) and does in-memory caching, and he's got a use case for that to be able to clobber.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all passing, but I'll wait to hear back from @andy-slac before merging anything.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was because back then we needed access to butler.datastore for implementation of this method and limited butler did not provide that property. We had extended limited butler interface since then, so it should be OK now to allow limited butler here.


if self.skipExisting:
_LOG.debug(
"Checking existence of metadata from previous execution of label=%s dataId=%s.",
Expand All @@ -333,7 +332,7 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
_LOG.debug(
"Looking for existing outputs in the way for label=%s dataId=%s.", taskDef.label, quantum.dataId
)
ref_dict = self.butler.stored_many(chain.from_iterable(quantum.outputs.values()))
ref_dict = limited_butler.stored_many(chain.from_iterable(quantum.outputs.values()))
existingRefs = [ref for ref, exists in ref_dict.items() if exists]
missingRefs = [ref for ref, exists in ref_dict.items() if not exists]
if existingRefs:
Expand All @@ -343,31 +342,30 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
return True
elif self.clobberOutputs:
_LOG.info("Removing complete outputs for quantum %s: %s", quantum, existingRefs)
self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
else:
raise RuntimeError(
f"Complete outputs exists for a quantum {quantum} "
"and neither clobberOutputs nor skipExisting is set: "
f"collection={self.butler.run} existingRefs={existingRefs}"
f"existingRefs={existingRefs}"
)
else:
# Partial outputs from a failed quantum.
_LOG.debug(
"Partial outputs exist for quantum %s collection=%s existingRefs=%s missingRefs=%s",
"Partial outputs exist for quantum %s existingRefs=%s missingRefs=%s",
quantum,
self.butler.run,
existingRefs,
missingRefs,
)
if self.clobberOutputs:
# only prune
_LOG.info("Removing partial outputs for task %s: %s", taskDef, existingRefs)
self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
return False
else:
raise RuntimeError(
"Registry inconsistency while checking for existing quantum outputs:"
f" quantum={quantum} collection={self.butler.run} existingRefs={existingRefs}"
f" quantum={quantum} existingRefs={existingRefs}"
f" missingRefs={missingRefs}"
)

Expand Down
Loading