Skip to content

Commit

Permalink
Simplify existence checks for input datasets.
Browse files Browse the repository at this point in the history
Logic branches for unresolved dataset refs in quanta and the old
version of pipeline mocking are no longer relevant.
  • Loading branch information
TallJimbo committed Aug 7, 2023
1 parent c015741 commit 4f97fe6
Showing 1 changed file with 6 additions and 30 deletions.
36 changes: 6 additions & 30 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,39 +401,15 @@ def updatedQuantumInputs(
newRefsForDatasetType = updatedInputs[key]
stored = limited_butler.stored_many(refsForDatasetType)
for ref in refsForDatasetType:
# Inputs may already be resolved even if they do not exist, but
# we have to re-resolve them because IDs are ignored on output.
# Check datastore for existence first to cover calibration
# dataset types, as they would need a timespan for findDataset.
resolvedRef: DatasetRef | None
if stored[ref]:
resolvedRef = ref
elif self.butler is not None:
# This branch is for mock execution only which does not
# generate actual outputs, only adds datasets to registry.
resolvedRef = self.butler.registry.findDataset(ref.datasetType, ref.dataId)
if resolvedRef is None:
_LOG.info("No dataset found for %s", ref)
continue
else:
_LOG.debug("Updated dataset ID for %s", ref)
newRefsForDatasetType.append(ref)
else:
# QBB with missing intermediate
_LOG.info("No dataset found for %s", ref)
# This should only happen if a predicted intermediate was
# not actually produced upstream, but
# datastore misconfigurations can unfortunately also land
# us here.
_LOG.info("No dataset artifact found for %s", ref)
continue

if (ref_stored := stored.get(resolvedRef)) or (
ref_stored is None and limited_butler.stored(resolvedRef)
):
# We need to ask datastore if the dataset actually exists
# because the Registry of a local "execution butler"
# cannot know this (because we prepopulate it with all of
# the datasets that might be created). Either we have
# already checked and know the answer, or the resolved
# ref differed from the original and we have to ask
# explicitly for that.
newRefsForDatasetType.append(resolvedRef)

if len(newRefsForDatasetType) != len(refsForDatasetType):
anyChanges = True
# If we removed any input datasets, let the task check if it has enough
Expand Down

0 comments on commit 4f97fe6

Please sign in to comment.