diff --git a/tests/test_cmdLineFwk.py b/tests/test_cmdLineFwk.py index 3c3f0e05..a7c6ebe5 100644 --- a/tests/test_cmdLineFwk.py +++ b/tests/test_cmdLineFwk.py @@ -60,7 +60,7 @@ Registry, ) from lsst.daf.butler.core.datasets.type import DatasetType -from lsst.daf.butler.registry import RegistryConfig +from lsst.daf.butler.registry import ConflictingDefinitionError, RegistryConfig from lsst.pipe.base import ( Instrument, Pipeline, @@ -70,6 +70,7 @@ TaskDef, ) from lsst.pipe.base.graphBuilder import DatasetQueryConstraintVariant as DQCVariant +from lsst.pipe.base.script import transfer_from_graph from lsst.pipe.base.tests.simpleQGraph import ( AddTask, AddTaskFactoryMock, @@ -532,6 +533,73 @@ def testSimpleQGraph(self): # test that we've disabled implicit threading self.assertEqual(os.environ["OMP_NUM_THREADS"], "1") + def test_simple_qgraph_qbb(self): + """Test successful execution of trivial quantum graph in QBB mode.""" + args = _makeArgs( + butler_config=self.root, input="test", output="output", qgraph_datastore_records=True + ) + butler = makeSimpleButler(self.root, run=args.input, inMemory=False) + populateButler(self.pipeline, butler) + + fwk = CmdLineFwk() + taskFactory = AddTaskFactoryMock() + + qgraph = fwk.makeGraph(self.pipeline, args) + self.assertEqual(len(qgraph.taskGraph), self.nQuanta) + self.assertEqual(len(qgraph), self.nQuanta) + + # Ensure that the output run used in the graph is also used in + # the pipeline execution. It is possible for makeGraph and runPipeline + # to calculate time-stamped runs across a second boundary. + output_run = qgraph.metadata["output_run"] + args.output_run = output_run + + # QBB must run from serialized graph. + with tempfile.NamedTemporaryFile(suffix=".qgraph") as temp_graph: + qgraph.saveUri(temp_graph.name) + + args = _makeArgs(butler_config=self.root, qgraph=temp_graph.name, config_search_path=[]) + + # Check that pre-exec-init can run. + fwk.preExecInitQBB(taskFactory, args) + + # Run whole thing. + fwk.runGraphQBB(taskFactory, args) + + # Transfer the datasets to the butler. + n1 = transfer_from_graph(temp_graph.name, self.root, True, False, False) + self.assertEqual(n1, 31) + + self.assertEqual(taskFactory.countExec, self.nQuanta) + + # Update the output run and try again. + new_output_run = output_run + "_new" + qgraph.updateRun(new_output_run, metadata_key="output_run", update_graph_id=True) + self.assertEqual(qgraph.metadata["output_run"], new_output_run) + + taskFactory = AddTaskFactoryMock() + with tempfile.NamedTemporaryFile(suffix=".qgraph") as temp_graph: + qgraph.saveUri(temp_graph.name) + + args = _makeArgs(butler_config=self.root, qgraph=temp_graph.name, config_search_path=[]) + + # Check that pre-exec-init can run. + fwk.preExecInitQBB(taskFactory, args) + + # Run whole thing. + fwk.runGraphQBB(taskFactory, args) + + # Transfer the datasets to the butler. + # TODO: DM-40392 + # This will fail because the UUIDs are not updated by updateRun + with self.assertRaises(ConflictingDefinitionError): + n2 = transfer_from_graph(temp_graph.name, self.root, True, False, False) + + # For reasons that have to be investigated the number of + # outputs the second time around is only 21. + self.assertEqual(n2, 31) + self.assertEqual(n1, n2) + def testEmptyQGraph(self): """Test that making an empty QG produces the right error messages.""" # We make QG generation fail by populating one input collection in the