Skip to content

Commit

Permalink
Add simple test for running pipeline in QBB mode
Browse files Browse the repository at this point in the history
The test has an intentional failure caused by updateRun
being broken. The test should be fixed on DM-40392.
  • Loading branch information
timj committed Aug 15, 2023
1 parent ff09b20 commit f5de634
Showing 1 changed file with 69 additions and 1 deletion.
70 changes: 69 additions & 1 deletion tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
Registry,
)
from lsst.daf.butler.core.datasets.type import DatasetType
from lsst.daf.butler.registry import RegistryConfig
from lsst.daf.butler.registry import ConflictingDefinitionError, RegistryConfig
from lsst.pipe.base import (
Instrument,
Pipeline,
Expand All @@ -70,6 +70,7 @@
TaskDef,
)
from lsst.pipe.base.graphBuilder import DatasetQueryConstraintVariant as DQCVariant
from lsst.pipe.base.script import transfer_from_graph
from lsst.pipe.base.tests.simpleQGraph import (
AddTask,
AddTaskFactoryMock,
Expand Down Expand Up @@ -532,6 +533,73 @@ def testSimpleQGraph(self):
# test that we've disabled implicit threading
self.assertEqual(os.environ["OMP_NUM_THREADS"], "1")

def test_simple_qgraph_qbb(self):
"""Test successful execution of trivial quantum graph in QBB mode."""
args = _makeArgs(
butler_config=self.root, input="test", output="output", qgraph_datastore_records=True
)
butler = makeSimpleButler(self.root, run=args.input, inMemory=False)
populateButler(self.pipeline, butler)

fwk = CmdLineFwk()
taskFactory = AddTaskFactoryMock()

qgraph = fwk.makeGraph(self.pipeline, args)
self.assertEqual(len(qgraph.taskGraph), self.nQuanta)
self.assertEqual(len(qgraph), self.nQuanta)

# Ensure that the output run used in the graph is also used in
# the pipeline execution. It is possible for makeGraph and runPipeline
# to calculate time-stamped runs across a second boundary.
output_run = qgraph.metadata["output_run"]
args.output_run = output_run

# QBB must run from serialized graph.
with tempfile.NamedTemporaryFile(suffix=".qgraph") as temp_graph:
qgraph.saveUri(temp_graph.name)

args = _makeArgs(butler_config=self.root, qgraph=temp_graph.name, config_search_path=[])

# Check that pre-exec-init can run.
fwk.preExecInitQBB(taskFactory, args)

# Run whole thing.
fwk.runGraphQBB(taskFactory, args)

# Transfer the datasets to the butler.
n1 = transfer_from_graph(temp_graph.name, self.root, True, False, False)
self.assertEqual(n1, 31)

self.assertEqual(taskFactory.countExec, self.nQuanta)

# Update the output run and try again.
new_output_run = output_run + "_new"
qgraph.updateRun(new_output_run, metadata_key="output_run", update_graph_id=True)
self.assertEqual(qgraph.metadata["output_run"], new_output_run)

taskFactory = AddTaskFactoryMock()
with tempfile.NamedTemporaryFile(suffix=".qgraph") as temp_graph:
qgraph.saveUri(temp_graph.name)

args = _makeArgs(butler_config=self.root, qgraph=temp_graph.name, config_search_path=[])

# Check that pre-exec-init can run.
fwk.preExecInitQBB(taskFactory, args)

# Run whole thing.
fwk.runGraphQBB(taskFactory, args)

# Transfer the datasets to the butler.
# TODO: DM-40392
# This will fail because the UUIDs are not updated by updateRun
with self.assertRaises(ConflictingDefinitionError):
n2 = transfer_from_graph(temp_graph.name, self.root, True, False, False)

# For reasons that have to be investigated the number of
# outputs the second time around is only 21.
self.assertEqual(n2, 31)
self.assertEqual(n1, n2)

Check warning on line 601 in tests/test_cmdLineFwk.py

View check run for this annotation

Codecov / codecov/patch

tests/test_cmdLineFwk.py#L600-L601

Added lines #L600 - L601 were not covered by tests

def testEmptyQGraph(self):
"""Test that making an empty QG produces the right error messages."""
# We make QG generation fail by populating one input collection in the
Expand Down

0 comments on commit f5de634

Please sign in to comment.