From b03bb6cc3398d98262604b7db4aa6fcba1dd0f16 Mon Sep 17 00:00:00 2001 From: Merlin Fisher-Levine Date: Wed, 3 Jul 2024 12:32:46 -0700 Subject: [PATCH 1/3] Re-enable skipping and clobbering with LimitedButler --- python/lsst/ctrl/mpexec/singleQuantumExecutor.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index 28abef30..81b49844 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -345,10 +345,6 @@ def checkExistingOutputs( """ task_node = self._conform_task_def(task_node) - if not self.butler: - # Skip/prune logic only works for full butler. - return False - if self.skipExisting: _LOG.debug( "Checking existence of metadata from previous execution of label=%s dataId=%s.", From c734367ae759f7718048ddb57f33978ea64c09aa Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 5 Jul 2024 11:23:56 -0400 Subject: [PATCH 2/3] Restore previous pipetask run-qbb behavior. It's not clear whether previous authors were aware that passing clobberOutputs=True and skipExisting=True in CmdLineFwk was previously doing nothing because SingleQuantumExecution executor shortcutted checkExistingOutputs when there was only a limited butler. But while it looks safe from a correctness standpoint to just drop that shortcut and let those options work as intended, that could mean a lot of new file existence checks in BPS jobs, so it's safer for this ticket to focus on enabling clobbering and skip-existing for other LimitedButlers while leaving run-qbb behavior strictly unchanged. --- python/lsst/ctrl/mpexec/cmdLineFwk.py | 3 +-- python/lsst/ctrl/mpexec/singleQuantumExecutor.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/python/lsst/ctrl/mpexec/cmdLineFwk.py b/python/lsst/ctrl/mpexec/cmdLineFwk.py index e5b959f4..c085e8a3 100644 --- a/python/lsst/ctrl/mpexec/cmdLineFwk.py +++ b/python/lsst/ctrl/mpexec/cmdLineFwk.py @@ -994,8 +994,7 @@ def runGraphQBB(self, task_factory: TaskFactory, args: SimpleNamespace) -> None: exitOnKnownError=args.fail_fast, limited_butler_factory=_butler_factory, resources=resources, - clobberOutputs=True, - skipExisting=True, + assumeNoExistingOutputs=True, ) timeout = self.MP_TIMEOUT if args.timeout is None else args.timeout diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index 81b49844..3987dba2 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -123,6 +123,12 @@ class SingleQuantumExecutor(QuantumExecutor): Unlike ``skipExistingIn``, this works with limited butlers as well as full butlers. Always set to `True` if ``skipExistingIn`` matches ``butler.run``. + assumeNoExistingOutputs : `bool`, optional + If `True`, assume preexisting outputs are impossible (e.g. because this + is known by higher-level code to be a new ``RUN`` collection), and do + not look for them. This causes the ``skipExisting`` and + ``clobberOutputs`` options to be ignored, but unlike just setting both + of those to `False`, it also avoids all dataset existence checks. """ def __init__( @@ -136,6 +142,7 @@ def __init__( limited_butler_factory: Callable[[Quantum], LimitedButler] | None = None, resources: ExecutionResources | None = None, skipExisting: bool = False, + assumeNoExistingOutputs: bool = False, ): self.butler = butler self.taskFactory = taskFactory @@ -144,6 +151,7 @@ def __init__( self.exitOnKnownError = exitOnKnownError self.limited_butler_factory = limited_butler_factory self.resources = resources + self.assumeNoExistingOutputs = assumeNoExistingOutputs if self.butler is None: assert limited_butler_factory is not None, "limited_butler_factory is needed when butler is None" @@ -345,6 +353,9 @@ def checkExistingOutputs( """ task_node = self._conform_task_def(task_node) + if self.assumeNoExistingOutputs: + return False + if self.skipExisting: _LOG.debug( "Checking existence of metadata from previous execution of label=%s dataId=%s.", From e07a1afafbc7e030d59ade42b4cd7daf02791d88 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 26 Jul 2024 10:30:35 -0400 Subject: [PATCH 3/3] Fix MyPy complaints due to DM-41326. DM-41326 broke some code here that isn't tested (I want to remove it soon) but is checked by MyPy - but only that only happens in CI when theres' a ctrl_mpexec branch, and there wasn't for that ticket. --- python/lsst/ctrl/mpexec/dotTools.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/lsst/ctrl/mpexec/dotTools.py b/python/lsst/ctrl/mpexec/dotTools.py index 97c11b08..a55d2ca0 100644 --- a/python/lsst/ctrl/mpexec/dotTools.py +++ b/python/lsst/ctrl/mpexec/dotTools.py @@ -113,7 +113,7 @@ def _renderQuantumNode( labels = [f"{quantumNode.nodeId}", html.escape(taskDef.label)] dataId = quantumNode.quantum.dataId assert dataId is not None, "Quantum DataId cannot be None" - labels.extend(f"{key} = {dataId[key]}" for key in sorted(dataId.keys())) + labels.extend(f"{key} = {dataId[key]}" for key in sorted(dataId.required.keys())) _renderNode(file, nodeName, "quantum", labels) @@ -128,7 +128,7 @@ def _renderDSTypeNode(name: str, dimensions: list[str], file: io.TextIOBase) -> def _renderDSNode(nodeName: str, dsRef: DatasetRef, file: io.TextIOBase) -> None: """Render GV node for a dataset""" labels = [html.escape(dsRef.datasetType.name), f"run: {dsRef.run!r}"] - labels.extend(f"{key} = {dsRef.dataId[key]}" for key in sorted(dsRef.dataId.keys())) + labels.extend(f"{key} = {dsRef.dataId[key]}" for key in sorted(dsRef.dataId.required.keys())) _renderNode(file, nodeName, "dataset", labels) @@ -144,7 +144,7 @@ def _renderEdge(fromName: str, toName: str, file: io.TextIOBase, **kwargs: Any) def _datasetRefId(dsRef: DatasetRef) -> str: """Make an identifying string for given ref""" dsId = [dsRef.datasetType.name] - dsId.extend(f"{key} = {dsRef.dataId[key]}" for key in sorted(dsRef.dataId.keys())) + dsId.extend(f"{key} = {dsRef.dataId[key]}" for key in sorted(dsRef.dataId.required.keys())) return ":".join(dsId)