Skip to content

Commit

Permalink
Add more logging to SingleQuantumExecutor.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Aug 7, 2023
1 parent 7c3e8f8 commit c015741
Showing 1 changed file with 23 additions and 0 deletions.
23 changes: 23 additions & 0 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ def _execute(self, taskDef: TaskDef, quantum: Quantum) -> Quantum:
quantumMetadata = _TASK_METADATA_TYPE()
logInfo(None, "prep", metadata=quantumMetadata) # type: ignore[arg-type]

_LOG.info("Preparing execution of quantum for label=%s dataId=%s.", taskDef.label, quantum.dataId)

# check whether to skip or delete old outputs, if it returns True
# or raises an exception do not try to store logs, as they may be
# already in butler.
Expand Down Expand Up @@ -244,6 +246,12 @@ def _execute(self, taskDef: TaskDef, quantum: Quantum) -> Quantum:
taskDef.config.freeze()
logInfo(None, "init", metadata=quantumMetadata) # type: ignore[arg-type]
init_input_refs = list(quantum.initInputs.values())

_LOG.info(
"Constructing task and executing quantum for label=%s dataId=%s.",
taskDef.label,
quantum.dataId,
)
task = self.taskFactory.makeTask(taskDef, limited_butler, init_input_refs)
logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type]
try:
Expand Down Expand Up @@ -301,6 +309,11 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
return False

if self.skipExisting:
_LOG.debug(
"Checking existence of metadata from previous execution of label=%s dataId=%s.",
taskDef.label,
quantum.dataId,
)
# Metadata output exists; this is sufficient to assume the previous
# run was successful and should be skipped.
[metadata_ref] = quantum.outputs[taskDef.metadataDatasetName]
Expand All @@ -309,6 +322,9 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
return True

# Find and prune (partial) outputs if `self.clobberOutputs` is set.
_LOG.debug(
"Looking for existing outputs in the way for label=%s dataId=%s.", taskDef.label, quantum.dataId
)
ref_dict = self.butler.stored_many(chain.from_iterable(quantum.outputs.values()))
existingRefs = [ref for ref, exists in ref_dict.items() if exists]
missingRefs = [ref for ref, exists in ref_dict.items() if not exists]
Expand Down Expand Up @@ -376,6 +392,12 @@ def updatedQuantumInputs(
anyChanges = False
updatedInputs: defaultdict[DatasetType, list] = defaultdict(list)
for key, refsForDatasetType in quantum.inputs.items():
_LOG.debug(
"Checking existence of input '%s' for label=%s dataId=%s.",
key.name,
taskDef.label,
quantum.dataId,
)
newRefsForDatasetType = updatedInputs[key]
stored = limited_butler.stored_many(refsForDatasetType)
for ref in refsForDatasetType:
Expand Down Expand Up @@ -424,6 +446,7 @@ def updatedQuantumInputs(
namedUpdatedInputs = NamedKeyDict[DatasetType, list[DatasetRef]](updatedInputs.items())
helper = AdjustQuantumHelper(namedUpdatedInputs, quantum.outputs)
if anyChanges:
_LOG.debug("Running adjustQuantum for label=%s dataId=%s.", taskDef.label, quantum.dataId)
assert quantum.dataId is not None, "Quantum DataId cannot be None"
helper.adjust_in_place(taskDef.connections, label=taskDef.label, data_id=quantum.dataId)
return Quantum(
Expand Down

0 comments on commit c015741

Please sign in to comment.