Skip to content

Commit

Permalink
Merge pull request #259 from lsst/tickets/DM-40390
Browse files Browse the repository at this point in the history
DM-40390: Remove TYPE_CHECKING block to fix DatastoreRecordData import
  • Loading branch information
timj authored Aug 15, 2023
2 parents 9254431 + f5de634 commit 8a7d0b7
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 15 deletions.
23 changes: 9 additions & 14 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,23 @@
import shutil
from collections.abc import Iterable, Mapping, Sequence
from types import SimpleNamespace
from typing import TYPE_CHECKING

import astropy.units as u
from astropy.table import Table
from lsst.daf.butler import (
Butler,
CollectionType,
Config,
DatasetId,
DatasetRef,
DatasetType,
DatastoreCacheManager,
DatastoreRecordData,
DimensionUniverse,
LimitedButler,
Quantum,
QuantumBackedButler,
Registry,
)
from lsst.daf.butler.registry import MissingCollectionError, RegistryDefaults
from lsst.daf.butler.registry.wildcards import CollectionWildcard
Expand All @@ -56,6 +62,8 @@
Pipeline,
PipelineDatasetTypes,
QuantumGraph,
TaskDef,
TaskFactory,
buildExecutionButler,
)
from lsst.utils import doImportType
Expand All @@ -68,19 +76,6 @@
from .preExecInit import PreExecInit, PreExecInitLimited
from .singleQuantumExecutor import SingleQuantumExecutor

if TYPE_CHECKING:
from lsst.daf.butler import (
Config,
DatasetType,
DatastoreRecordData,
DimensionUniverse,
LimitedButler,
Quantum,
Registry,
)
from lsst.pipe.base import TaskDef, TaskFactory


# ----------------------------------
# Local non-exported definitions --
# ----------------------------------
Expand Down
70 changes: 69 additions & 1 deletion tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
Registry,
)
from lsst.daf.butler.core.datasets.type import DatasetType
from lsst.daf.butler.registry import RegistryConfig
from lsst.daf.butler.registry import ConflictingDefinitionError, RegistryConfig
from lsst.pipe.base import (
Instrument,
Pipeline,
Expand All @@ -70,6 +70,7 @@
TaskDef,
)
from lsst.pipe.base.graphBuilder import DatasetQueryConstraintVariant as DQCVariant
from lsst.pipe.base.script import transfer_from_graph
from lsst.pipe.base.tests.simpleQGraph import (
AddTask,
AddTaskFactoryMock,
Expand Down Expand Up @@ -532,6 +533,73 @@ def testSimpleQGraph(self):
# test that we've disabled implicit threading
self.assertEqual(os.environ["OMP_NUM_THREADS"], "1")

def test_simple_qgraph_qbb(self):
"""Test successful execution of trivial quantum graph in QBB mode."""
args = _makeArgs(
butler_config=self.root, input="test", output="output", qgraph_datastore_records=True
)
butler = makeSimpleButler(self.root, run=args.input, inMemory=False)
populateButler(self.pipeline, butler)

fwk = CmdLineFwk()
taskFactory = AddTaskFactoryMock()

qgraph = fwk.makeGraph(self.pipeline, args)
self.assertEqual(len(qgraph.taskGraph), self.nQuanta)
self.assertEqual(len(qgraph), self.nQuanta)

# Ensure that the output run used in the graph is also used in
# the pipeline execution. It is possible for makeGraph and runPipeline
# to calculate time-stamped runs across a second boundary.
output_run = qgraph.metadata["output_run"]
args.output_run = output_run

# QBB must run from serialized graph.
with tempfile.NamedTemporaryFile(suffix=".qgraph") as temp_graph:
qgraph.saveUri(temp_graph.name)

args = _makeArgs(butler_config=self.root, qgraph=temp_graph.name, config_search_path=[])

# Check that pre-exec-init can run.
fwk.preExecInitQBB(taskFactory, args)

# Run whole thing.
fwk.runGraphQBB(taskFactory, args)

# Transfer the datasets to the butler.
n1 = transfer_from_graph(temp_graph.name, self.root, True, False, False)
self.assertEqual(n1, 31)

self.assertEqual(taskFactory.countExec, self.nQuanta)

# Update the output run and try again.
new_output_run = output_run + "_new"
qgraph.updateRun(new_output_run, metadata_key="output_run", update_graph_id=True)
self.assertEqual(qgraph.metadata["output_run"], new_output_run)

taskFactory = AddTaskFactoryMock()
with tempfile.NamedTemporaryFile(suffix=".qgraph") as temp_graph:
qgraph.saveUri(temp_graph.name)

args = _makeArgs(butler_config=self.root, qgraph=temp_graph.name, config_search_path=[])

# Check that pre-exec-init can run.
fwk.preExecInitQBB(taskFactory, args)

# Run whole thing.
fwk.runGraphQBB(taskFactory, args)

# Transfer the datasets to the butler.
# TODO: DM-40392
# This will fail because the UUIDs are not updated by updateRun
with self.assertRaises(ConflictingDefinitionError):
n2 = transfer_from_graph(temp_graph.name, self.root, True, False, False)

# For reasons that have to be investigated the number of
# outputs the second time around is only 21.
self.assertEqual(n2, 31)
self.assertEqual(n1, n2)

def testEmptyQGraph(self):
"""Test that making an empty QG produces the right error messages."""
# We make QG generation fail by populating one input collection in the
Expand Down

0 comments on commit 8a7d0b7

Please sign in to comment.