Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-35396: Pass quantum ID down to executor so it can be used in provenance #321

Merged
merged 5 commits into from
Feb 4, 2025

Conversation

timj
Copy link
Member

@timj timj commented Jan 30, 2025

Requires lsst/pipe_base#464 and lsst/daf_butler#1147

Checklist

  • ran Jenkins
  • added a release note for user-visible changes to doc/changes

Copy link

codecov bot commented Jan 30, 2025

❌ 36 Tests Failed:

Tests completed Failed Passed Skipped
157 36 121 0
View the top 3 failed tests by shortest run time
tests/test_executors.py::SingleQuantumExecutorTestCase::test_clobber_outputs_execute
Stack Traces | 0.492s run time
self = <test_executors.SingleQuantumExecutorTestCase testMethod=test_clobber_outputs_execute>

    def test_clobber_outputs_execute(self) -> None:
        """Run execute() method twice, with clobber_outputs."""
        nQuanta = 1
        butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument)
    
        nodes = list(qgraph)
        self.assertEqual(len(nodes), nQuanta)
        node = nodes[0]
    
        taskFactory = AddTaskFactoryMock()
        executor = SingleQuantumExecutor(butler, taskFactory)
>       executor.execute(node.task_node, node.quantum)

tests/test_executors.py:743: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../ctrl/mpexec/singleQuantumExecutor.py:181: in execute
    result = self._execute(task_node, quantum, quantum_id=quantum_id)
.../ctrl/mpexec/singleQuantumExecutor.py:276: in _execute
    caveats, outputsPut = self.runQuantum(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.ctrl.mpexec.singleQuantumExecutor.SingleQuantumExecutor object at 0x7f0914248950>
task = <lsst.pipe.base.tests.simpleQGraph.AddTask object at 0x7f0914298890>
quantum = <lsst.daf.butler._quantum.Quantum object at 0x7f0914569240>
task_node = task0 (lsst.pipe.base.tests.simpleQGraph.AddTask)
limited_butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7f091468f5b0>
quantum_id = None

    def runQuantum(
        self,
        task: PipelineTask,
        quantum: Quantum,
        task_node: TaskNode,
        /,
        limited_butler: LimitedButler,
        quantum_id: uuid.UUID | None = None,
    ) -> tuple[QuantumSuccessCaveats, list[uuid.UUID]]:
        """Execute task on a single quantum.
    
        Parameters
        ----------
        task : `~lsst.pipe.base.PipelineTask`
            Task object.
        quantum : `~lsst.daf.butler.Quantum`
            Single Quantum instance.
        task_node : `~lsst.pipe.base.pipeline_graph.TaskNode`
            Task definition structure.
        limited_butler : `~lsst.daf.butler.LimitedButler`
            Butler to use for dataset I/O.
        quantum_id : `uuid.UUID` or `None`, optional
            ID of the quantum being executed.
    
        Returns
        -------
        flags : `QuantumSuccessCaveats`
            Flags that describe qualified successes.
        ids_put : list[ `uuid.UUID` ]
            Record of all the dataset IDs that were written by this quantum
            being executed.
        """
        flags = QuantumSuccessCaveats.NO_CAVEATS
    
        # Create a butler that operates in the context of a quantum
>       butlerQC = QuantumContext(limited_butler, quantum, resources=self.resources, quantum_id=quantum_id)
E       TypeError: QuantumContext.__init__() got an unexpected keyword argument 'quantum_id'. Did you mean 'quantum'?

.../ctrl/mpexec/singleQuantumExecutor.py:518: TypeError
tests/test_executors.py::SingleQuantumExecutorTestCase::test_simple_execute
Stack Traces | 0.501s run time
self = <test_executors.SingleQuantumExecutorTestCase testMethod=test_simple_execute>

    def test_simple_execute(self) -> None:
        """Run execute() method in simplest setup."""
        nQuanta = 1
        butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument)
    
        nodes = list(qgraph)
        self.assertEqual(len(nodes), nQuanta)
        node = nodes[0]
    
        taskFactory = AddTaskFactoryMock()
        executor = SingleQuantumExecutor(butler, taskFactory)
>       executor.execute(node.task_node, node.quantum)

tests/test_executors.py:696: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../ctrl/mpexec/singleQuantumExecutor.py:181: in execute
    result = self._execute(task_node, quantum, quantum_id=quantum_id)
.../ctrl/mpexec/singleQuantumExecutor.py:276: in _execute
    caveats, outputsPut = self.runQuantum(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.ctrl.mpexec.singleQuantumExecutor.SingleQuantumExecutor object at 0x7f090d9b1ae0>
task = <lsst.pipe.base.tests.simpleQGraph.AddTask object at 0x7f090db42c10>
quantum = <lsst.daf.butler._quantum.Quantum object at 0x7f090dc6f460>
task_node = task0 (lsst.pipe.base.tests.simpleQGraph.AddTask)
limited_butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7f0914bcf690>
quantum_id = None

    def runQuantum(
        self,
        task: PipelineTask,
        quantum: Quantum,
        task_node: TaskNode,
        /,
        limited_butler: LimitedButler,
        quantum_id: uuid.UUID | None = None,
    ) -> tuple[QuantumSuccessCaveats, list[uuid.UUID]]:
        """Execute task on a single quantum.
    
        Parameters
        ----------
        task : `~lsst.pipe.base.PipelineTask`
            Task object.
        quantum : `~lsst.daf.butler.Quantum`
            Single Quantum instance.
        task_node : `~lsst.pipe.base.pipeline_graph.TaskNode`
            Task definition structure.
        limited_butler : `~lsst.daf.butler.LimitedButler`
            Butler to use for dataset I/O.
        quantum_id : `uuid.UUID` or `None`, optional
            ID of the quantum being executed.
    
        Returns
        -------
        flags : `QuantumSuccessCaveats`
            Flags that describe qualified successes.
        ids_put : list[ `uuid.UUID` ]
            Record of all the dataset IDs that were written by this quantum
            being executed.
        """
        flags = QuantumSuccessCaveats.NO_CAVEATS
    
        # Create a butler that operates in the context of a quantum
>       butlerQC = QuantumContext(limited_butler, quantum, resources=self.resources, quantum_id=quantum_id)
E       TypeError: QuantumContext.__init__() got an unexpected keyword argument 'quantum_id'. Did you mean 'quantum'?

.../ctrl/mpexec/singleQuantumExecutor.py:518: TypeError
tests/test_execution_storage_class_conversion.py::TestExecutionStorageClassConversion::test_input_differs
Stack Traces | 0.793s run time
self = <test_execution_storage_class_conversion.TestExecutionStorageClassConversion testMethod=test_input_differs>

    def test_input_differs(self):
        """Test execution where an overall input's storage class is different
        from the consuming task.
        """
        executor = self._make_executor(a_i_storage_class="TaskMetadataLike")
>       quanta = executor.run(register_dataset_types=True, save_versions=False)

tests/test_execution_storage_class_conversion.py:257: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../ctrl/mpexec/simple_pipeline_executor.py:432: in run
    return list(
.../ctrl/mpexec/simple_pipeline_executor.py:485: in <genexpr>
    single_quantum_executor.execute(qnode.task_node, qnode.quantum)[0] for qnode in self.quantum_graph
.../ctrl/mpexec/singleQuantumExecutor.py:181: in execute
    result = self._execute(task_node, quantum, quantum_id=quantum_id)
.../ctrl/mpexec/singleQuantumExecutor.py:276: in _execute
    caveats, outputsPut = self.runQuantum(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <lsst.ctrl.mpexec.singleQuantumExecutor.SingleQuantumExecutor object at 0x7f0914c43e10>
task = <lsst.pipe.base.tests.mocks._pipeline_task.DynamicTestPipelineTask object at 0x7f0914e63250>
quantum = <lsst.daf.butler._quantum.Quantum object at 0x7f091434c160>
task_node = a (lsst.pipe.base.tests.mocks.DynamicTestPipelineTask)
limited_butler = <lsst.daf.butler.direct_butler._direct_butler.DirectButler object at 0x7f090de1e900>
quantum_id = None

    def runQuantum(
        self,
        task: PipelineTask,
        quantum: Quantum,
        task_node: TaskNode,
        /,
        limited_butler: LimitedButler,
        quantum_id: uuid.UUID | None = None,
    ) -> tuple[QuantumSuccessCaveats, list[uuid.UUID]]:
        """Execute task on a single quantum.
    
        Parameters
        ----------
        task : `~lsst.pipe.base.PipelineTask`
            Task object.
        quantum : `~lsst.daf.butler.Quantum`
            Single Quantum instance.
        task_node : `~lsst.pipe.base.pipeline_graph.TaskNode`
            Task definition structure.
        limited_butler : `~lsst.daf.butler.LimitedButler`
            Butler to use for dataset I/O.
        quantum_id : `uuid.UUID` or `None`, optional
            ID of the quantum being executed.
    
        Returns
        -------
        flags : `QuantumSuccessCaveats`
            Flags that describe qualified successes.
        ids_put : list[ `uuid.UUID` ]
            Record of all the dataset IDs that were written by this quantum
            being executed.
        """
        flags = QuantumSuccessCaveats.NO_CAVEATS
    
        # Create a butler that operates in the context of a quantum
>       butlerQC = QuantumContext(limited_butler, quantum, resources=self.resources, quantum_id=quantum_id)
E       TypeError: QuantumContext.__init__() got an unexpected keyword argument 'quantum_id'. Did you mean 'quantum'?

.../ctrl/mpexec/singleQuantumExecutor.py:518: TypeError

To view more test analytics, go to the Test Analytics Dashboard
📢 Thoughts on this report? Let us know!

@timj timj force-pushed the tickets/DM-35396 branch from d514d47 to 39b6dfc Compare January 30, 2025 17:56
@timj timj force-pushed the tickets/DM-35396 branch from b11c6dd to 8e6aeb2 Compare January 30, 2025 20:19
@timj timj force-pushed the tickets/DM-35396 branch from 8e6aeb2 to aac81ed Compare February 4, 2025 19:45
@timj timj force-pushed the tickets/DM-35396 branch from aac81ed to ee1c935 Compare February 4, 2025 20:51
@timj timj merged commit 769567f into main Feb 4, 2025
14 checks passed
@timj timj deleted the tickets/DM-35396 branch February 4, 2025 20:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants