diff --git a/pyproject.toml b/pyproject.toml index c70793e5..00368e29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,10 +38,7 @@ dynamic = ["version"] "Homepage" = "https://github.com/lsst/ctrl_mpexec" [project.optional-dependencies] -test = [ - "pytest >= 3.2", - "pytest-openfiles >= 0.5.0" -] +test = ["pytest >= 3.2"] [tool.setuptools.packages.find] where = ["python"] diff --git a/tests/test_execution_storage_class_conversion.py b/tests/test_execution_storage_class_conversion.py new file mode 100644 index 00000000..e90f0fb4 --- /dev/null +++ b/tests/test_execution_storage_class_conversion.py @@ -0,0 +1,317 @@ +# This file is part of ctrl_mpexec. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +import os +import shutil +import tempfile +import unittest + +import lsst.daf.butler +import lsst.utils.tests +from lsst.ctrl.mpexec import SimplePipelineExecutor +from lsst.pipe.base import PipelineGraph +from lsst.pipe.base.pipeline_graph import IncompatibleDatasetTypeError +from lsst.pipe.base.tests.mocks import ( + DynamicConnectionConfig, + DynamicTestPipelineTask, + DynamicTestPipelineTaskConfig, + MockStorageClass, + get_mock_name, +) + +TESTDIR = os.path.abspath(os.path.dirname(__file__)) + + +class TestExecutionStorageClassConversion(lsst.utils.tests.TestCase): + """Test storage class conversions during execution. + + Task connection declarations should always define which storage class they + see, while data repository registrations should always define what is + stored. + + This test uses mock storage classes for intermediate and output datasets, + which let us load the dataset to see what storage class the task saw when + it was running. These storage class names need to be wrapped in + get_mock_name calls to get what the butler actually sees. Overall input + datasets are not declared with mock datasets, so we can `put` them directly + in test code. + """ + + def setUp(self): + self.path = tempfile.mkdtemp() + # standalone parameter forces the returned config to also include + # the information from the search paths. + config = lsst.daf.butler.Butler.makeRepo( + self.path, standalone=True, searchPaths=[os.path.join(TESTDIR, "config")] + ) + self.butler = SimplePipelineExecutor.prep_butler(config, [], "fake") + self.butler.registry.registerDatasetType( + lsst.daf.butler.DatasetType( + "input", + dimensions=self.butler.dimensions.empty, + storageClass="StructuredDataDict", + ) + ) + self.butler.put({"zero": 0}, "input") + MockStorageClass.get_or_register_mock("StructuredDataDict") + MockStorageClass.get_or_register_mock("TaskMetadataLike") + + def tearDown(self): + shutil.rmtree(self.path, ignore_errors=True) + + def _make_config( + self, + input_storage_class="StructuredDataDict", + output_storage_class="StructuredDataDict", + input_name="input", + output_name="output", + ): + """Create configuration for a test task with a single input and single + output of the given storage classes and dataset type names. + """ + config = DynamicTestPipelineTaskConfig() + config.inputs["i"] = DynamicConnectionConfig( + dataset_type_name=input_name, + storage_class=input_storage_class, + # Since the overall input is special, we only use a mock storage + # class for it when there's a storage class conversion. + mock_storage_class=(input_name != "input" or (input_storage_class != "StructuredDataDict")), + ) + config.outputs["o"] = DynamicConnectionConfig( + dataset_type_name=output_name, storage_class=output_storage_class + ) + return config + + def _make_executor( + self, + a_i_storage_class="StructuredDataDict", + a_o_storage_class="StructuredDataDict", + b_i_storage_class="StructuredDataDict", + b_o_storage_class="StructuredDataDict", + ): + """Configure a SimplePipelineExecutor with tasks with the given + storage classes as inputs and outputs. + + This sets up a simple pipeline with two tasks ('a' and 'b') where the + second task's only input is the first task's only output. + """ + config_a = self._make_config(a_i_storage_class, a_o_storage_class, output_name="intermediate") + config_b = self._make_config(b_i_storage_class, b_o_storage_class, input_name="intermediate") + pipeline_graph = PipelineGraph() + pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) + pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b) + executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler) + return executor + + def _assert_datasets( + self, + a_i_storage_class="StructuredDataDict", + a_o_storage_class="StructuredDataDict", + b_i_storage_class="StructuredDataDict", + b_o_storage_class="StructuredDataDict", + stored_intermediate_storage_class="StructuredDataDict", + stored_output_storage_class="StructuredDataDict", + ): + """Check that a butler repository's contents are consistent with + running a pipeline created by _make_executor. + """ + # Read input and output datasets from butler, inspect their storage + # classes directly. + stored_intermediate = self.butler.get("intermediate") + stored_output = self.butler.get("output") + self.assertEqual( + self.butler.get_dataset_type("intermediate").storageClass_name, + get_mock_name(stored_intermediate_storage_class), + ) + self.assertEqual(stored_output.storage_class, get_mock_name(stored_output_storage_class)) + self.assertEqual( + self.butler.get_dataset_type("output").storageClass_name, + get_mock_name(stored_output_storage_class), + ) + # Since we didn't tell the butler to convert storage classes on read, + # they'll remember their last conversion (on write). + if a_o_storage_class != stored_intermediate_storage_class: + self.assertEqual( + stored_intermediate.converted_from.storage_class, + get_mock_name(a_o_storage_class), + ) + else: + self.assertIsNone(stored_intermediate.converted_from) + if b_o_storage_class != stored_output_storage_class: + self.assertEqual( + stored_output.converted_from.storage_class, + get_mock_name(b_o_storage_class), + ) + else: + self.assertIsNone(stored_output.converted_from) + # Extract the inputs as seen by the tasks from those stored outputs. + quantum_a = stored_intermediate.quantum + quantum_b = stored_output.quantum + b_input = quantum_b.inputs["i"][0] + a_input = quantum_a.inputs["i"][0] + if a_i_storage_class == "StructuredDataDict": + self.assertIsNone(a_input.converted_from, None) + else: + self.assertEqual(a_input.original_type, "dict") + self.assertEqual(b_input.storage_class, get_mock_name(b_i_storage_class)) + + def test_no_conversions(self): + """Test execution with no storage class conversions as a baseline.""" + executor = self._make_executor() + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 2) + self._assert_datasets() + + def test_intermediate_registration_differs(self): + """Test execution where an intermediate is registered to be different + from both the producing and consuming task. + """ + self.butler.registry.registerDatasetType( + lsst.daf.butler.DatasetType( + "intermediate", + dimensions=self.butler.dimensions.empty, + storageClass=get_mock_name("TaskMetadataLike"), + ) + ) + executor = self._make_executor() + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 2) + self._assert_datasets(stored_intermediate_storage_class="TaskMetadataLike") + + def test_intermediate_producer_differs(self): + """Test execution where an intermediate is registered to be consistent + with the consumer but different from its producer. + """ + self.butler.registry.registerDatasetType( + lsst.daf.butler.DatasetType( + "intermediate", + dimensions=self.butler.dimensions.empty, + storageClass=get_mock_name("TaskMetadataLike"), + ) + ) + executor = self._make_executor(b_i_storage_class="TaskMetadataLike") + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 2) + self._assert_datasets( + stored_intermediate_storage_class="TaskMetadataLike", b_i_storage_class="TaskMetadataLike" + ) + + def test_intermediate_consumer_differs(self): + """Test execution where an intermediate is registered to be consistent + with its producer but different from its consumer. + """ + executor = self._make_executor(a_o_storage_class="TaskMetadataLike") + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 2) + self._assert_datasets( + stored_intermediate_storage_class="TaskMetadataLike", a_o_storage_class="TaskMetadataLike" + ) + + def test_output_differs(self): + """Test execution where an overall output is registered to be + different from the producing task. + """ + self.butler.registry.registerDatasetType( + lsst.daf.butler.DatasetType( + "output", + dimensions=self.butler.dimensions.empty, + storageClass=get_mock_name("TaskMetadataLike"), + ) + ) + executor = self._make_executor() + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 2) + self._assert_datasets(stored_output_storage_class="TaskMetadataLike") + + def test_input_differs(self): + """Test execution where an overall input's storage class is different + from the consuming task. + """ + executor = self._make_executor(a_i_storage_class="TaskMetadataLike") + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 2) + self._assert_datasets(a_i_storage_class="TaskMetadataLike") + + def test_incompatible(self): + """Test that we cannot make a QG if the registry and pipeline have + incompatible storage classes for a dataset type. + """ + # Incompatible output dataset type. + self.butler.registry.registerDatasetType( + lsst.daf.butler.DatasetType( + "output", + dimensions=self.butler.dimensions.empty, + storageClass="StructuredDataList", + ) + ) + with self.assertRaisesRegex( + IncompatibleDatasetTypeError, "Incompatible definition.*StructuredDataDict.*StructuredDataList.*" + ): + self._make_executor() + + def test_registry_changed(self): + """Run pipeline, but change registry dataset types between making the + QG and executing it. + + This only fails with full-butler execution; we don't have a way to + prevent it with QBB. + """ + executor = self._make_executor() + self.butler.registry.registerDatasetType( + lsst.daf.butler.DatasetType( + "output", + dimensions=self.butler.dimensions.empty, + storageClass="TaskMetadataLike", # even compatible is not okay + ) + ) + with self.assertRaisesRegex( + lsst.daf.butler.registry.ConflictingDefinitionError, + ".*definition in registry has changed.*StructuredDataDict.*TaskMetadataLike.*", + ): + executor.run(register_dataset_types=True, save_versions=False) + + +class MemoryTester(lsst.utils.tests.MemoryTestCase): + """Generic tests for file leaks.""" + + +def setup_module(module): + """Set up the module for pytest. + + Parameters + ---------- + module : `~types.ModuleType` + Module to set up. + """ + lsst.utils.tests.init() + + +if __name__ == "__main__": + lsst.utils.tests.init() + unittest.main() diff --git a/tests/test_simple_pipeline_executor.py b/tests/test_simple_pipeline_executor.py index be4ac93b..95842c1e 100644 --- a/tests/test_simple_pipeline_executor.py +++ b/tests/test_simple_pipeline_executor.py @@ -31,94 +31,30 @@ import shutil import tempfile import unittest -from typing import Any import lsst.daf.butler import lsst.utils.tests from lsst.ctrl.mpexec import SimplePipelineExecutor -from lsst.pipe.base import PipelineGraph, Struct, TaskMetadata, connectionTypes -from lsst.pipe.base.pipeline_graph import IncompatibleDatasetTypeError -from lsst.pipe.base.tests.no_dimensions import ( - NoDimensionsTestConfig, - NoDimensionsTestConnections, - NoDimensionsTestTask, +from lsst.pipe.base import PipelineGraph +from lsst.pipe.base.tests.mocks import ( + DynamicConnectionConfig, + DynamicTestPipelineTask, + DynamicTestPipelineTaskConfig, + MockStorageClass, + get_mock_name, ) -from lsst.utils.introspection import get_full_type_name TESTDIR = os.path.abspath(os.path.dirname(__file__)) -class NoDimensionsTestConnections2(NoDimensionsTestConnections, dimensions=set()): - """A connections class used for testing.""" - - input = connectionTypes.Input( - name="input", doc="some dict-y input data for testing", storageClass="TaskMetadataLike" - ) - - -class NoDimensionsTestConfig2(NoDimensionsTestConfig, pipelineConnections=NoDimensionsTestConnections2): - """A config used for testing.""" - - -class NoDimensionsMetadataTestConnections(NoDimensionsTestConnections, dimensions=set()): - """Test connection class for metadata. +class SimplePipelineExecutorTests(lsst.utils.tests.TestCase): + """Test the SimplePipelineExecutor API. - Deliberately choose a storage class that does not match the metadata - default TaskMetadata storage class. + Because SimplePipelineExecutor is the easiest way to run simple pipelines + in tests, this has also become a home for tests of execution edge cases + that don't have a clear home in other test files. """ - meta = connectionTypes.Input( - name="a_metadata", doc="Metadata from previous task", storageClass="StructuredDataDict" - ) - - -class NoDimensionsMetadataTestConfig( - NoDimensionsTestConfig, pipelineConnections=NoDimensionsMetadataTestConnections -): - """A config used for testing the metadata.""" - - -class NoDimensionsMetadataTestTask(NoDimensionsTestTask): - """A simple pipeline task that can take a metadata as input.""" - - ConfigClass = NoDimensionsMetadataTestConfig - _DefaultName = "noDimensionsMetadataTest" - - def run(self, input: dict[str, int], meta: dict[str, Any]) -> Struct: - """Run the task, adding the configured key-value pair to the input - argument and returning it as the output. - - Parameters - ---------- - input : `dict` - Dictionary to update and return. - meta : `dict` - Metadata to add. - - Returns - ------- - result : `lsst.pipe.base.Struct` - Struct with a single ``output`` attribute. - """ - self.log.info("Run metadata method given data of type: %s", get_full_type_name(input)) - output = input.copy() - output[self.config.key] = self.config.value - - self.log.info("Received task metadata (%s): %s", get_full_type_name(meta), meta) - - # Can change the return type via configuration. - if "TaskMetadata" in self.config.outputSC: - output = TaskMetadata.from_dict(output) - elif type(output) is TaskMetadata: - # Want the output to be a dict - output = output.to_dict() - self.log.info("Run method returns data of type: %s", get_full_type_name(output)) - return Struct(output=output) - - -class SimplePipelineExecutorTests(lsst.utils.tests.TestCase): - """Test the SimplePipelineExecutor API with a trivial task.""" - def setUp(self): self.path = tempfile.mkdtemp() # standalone parameter forces the returned config to also include @@ -135,6 +71,8 @@ def setUp(self): ) ) self.butler.put({"zero": 0}, "input") + MockStorageClass.get_or_register_mock("StructuredDataDict") + MockStorageClass.get_or_register_mock("TaskMetadataLike") def tearDown(self): shutil.rmtree(self.path, ignore_errors=True) @@ -144,194 +82,101 @@ def test_from_task_class(self): `from_task_class` factory method, and the `SimplePipelineExecutor.as_generator` method. """ - executor = SimplePipelineExecutor.from_task_class(NoDimensionsTestTask, butler=self.butler) + config = DynamicTestPipelineTaskConfig() + config.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False + ) + config.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="output", storage_class="StructuredDataDict" + ) + executor = SimplePipelineExecutor.from_task_class( + DynamicTestPipelineTask, + config=config, + butler=self.butler, + label="a", + ) (quantum,) = executor.as_generator(register_dataset_types=True) - self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1}) + self.assertEqual(self.butler.get("output").storage_class, get_mock_name("StructuredDataDict")) - def _configure_pipeline(self, config_a_cls, config_b_cls, storageClass_a=None, storageClass_b=None): - """Configure a pipeline with from_pipeline_graph.""" - config_a = config_a_cls() - config_a.connections.output = "intermediate" - if storageClass_a: - config_a.outputSC = storageClass_a - config_b = config_b_cls() - config_b.connections.input = "intermediate" - if storageClass_b: - config_b.outputSC = storageClass_b - config_b.key = "two" - config_b.value = 2 + def test_metadata_input(self): + """Test two tasks where the output uses metadata from input.""" + config_a = DynamicTestPipelineTaskConfig() + config_a.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False + ) + config_b = DynamicTestPipelineTaskConfig() + config_b.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="output", storage_class="StructuredDataDict" + ) + config_b.inputs["in_metadata"] = DynamicConnectionConfig( + dataset_type_name="a_metadata", storage_class="TaskMetadata" + ) pipeline_graph = PipelineGraph() - pipeline_graph.add_task("a", NoDimensionsTestTask, config_a) - pipeline_graph.add_task("b", NoDimensionsTestTask, config_b) + pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) + pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b) executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler) - return executor - - def _test_logs(self, log_output, input_type_a, output_type_a, input_type_b, output_type_b): - """Check the expected input types received by tasks A and B. - - Note that these are the types as seen from the perspective of the task, - so they must be consistent with the task's connections, but may not be - consistent with the registry dataset types. - """ - all_logs = "\n".join(log_output) - self.assertIn(f"lsst.a:Run method given data of type: {input_type_a}", all_logs) - self.assertIn(f"lsst.b:Run method given data of type: {input_type_b}", all_logs) - self.assertIn(f"lsst.a:Run method returns data of type: {output_type_a}", all_logs) - self.assertIn(f"lsst.b:Run method returns data of type: {output_type_b}", all_logs) - - def test_from_pipeline(self): - """Test executing a two quanta from different configurations of the - same task, with an executor created by the `from_pipeline` factory - method, and the `SimplePipelineExecutor.run` method. - """ - executor = self._configure_pipeline( - NoDimensionsTestTask.ConfigClass, NoDimensionsTestTask.ConfigClass - ) - - with self.assertLogs("lsst", level="INFO") as cm: - quanta = executor.run(register_dataset_types=True, save_versions=False) - self._test_logs(cm.output, "dict", "dict", "dict", "dict") - + quanta = executor.run(register_dataset_types=True, save_versions=False) self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) - self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) + output = self.butler.get("output") + self.assertEqual(output.quantum.inputs["in_metadata"][0].original_type, "lsst.pipe.base.TaskMetadata") - def test_from_pipeline_intermediates_differ(self): - """Run pipeline but intermediates definition in registry differs.""" - # Pre-define the "intermediate" storage class to be something that is - # like a dict but is not a dict. This will fail unless storage - # class conversion is supported in put and get. - self.butler.registry.registerDatasetType( - lsst.daf.butler.DatasetType( - "intermediate", - dimensions=self.butler.dimensions.empty, - storageClass="TaskMetadataLike", - ) + def test_optional_intermediate(self): + """Test a pipeline task with an optional regular input that is produced + by another task. + """ + config_a = DynamicTestPipelineTaskConfig() + config_a.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False ) - executor = self._configure_pipeline( - NoDimensionsTestTask.ConfigClass, - NoDimensionsTestTask.ConfigClass, - storageClass_b="TaskMetadataLike", + config_a.fail_exception = "lsst.pipe.base.NoWorkFound" + config_a.fail_condition = "1=1" # butler query expression that is true + config_a.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="intermediate", storage_class="StructuredDataDict" ) - with self.assertLogs("lsst", level="INFO") as cm: - quanta = executor.run(register_dataset_types=True, save_versions=False) - # A dict is given to task a without change. - # A returns a dict because it has not been told to do anything else. - # That does not match the storage class so it will be converted - # on put. - # b is given a dict, because that's what its connection asks for. - # b returns a TaskMetadata because that's how we configured it, and - # since its output wasn't registered in advance, it will have been - # registered as TaskMetadata and will now be received as TaskMetadata. - self._test_logs(cm.output, "dict", "dict", "dict", "lsst.pipe.base.TaskMetadata") - - self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate"), TaskMetadata.from_dict({"zero": 0, "one": 1})) - self.assertEqual(self.butler.get("output"), TaskMetadata.from_dict({"zero": 0, "one": 1, "two": 2})) - - def test_from_pipeline_output_differ(self): - """Run pipeline but output definition in registry differs.""" - # Pre-define the "output" storage class to be something that is - # like a dict but is not a dict. This will fail unless storage - # class conversion is supported in put and get. - self.butler.registry.registerDatasetType( - lsst.daf.butler.DatasetType( - "output", - dimensions=self.butler.dimensions.empty, - storageClass="TaskMetadataLike", - ) + config_b = DynamicTestPipelineTaskConfig() + config_b.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0 ) - executor = self._configure_pipeline( - NoDimensionsTestTask.ConfigClass, - NoDimensionsTestTask.ConfigClass, - storageClass_a="TaskMetadataLike", + config_b.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="output", storage_class="StructuredDataDict" ) - with self.assertLogs("lsst", level="INFO") as cm: - quanta = executor.run(register_dataset_types=True, save_versions=False) - # a has been told to return a TaskMetadata but this will convert to - # dict on read by b. - # b returns a dict and that is converted to TaskMetadata on put. - self._test_logs(cm.output, "dict", "lsst.pipe.base.TaskMetadata", "dict", "dict") - - self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate").to_dict(), {"zero": 0, "one": 1}) - self.assertEqual(self.butler.get("output").to_dict(), {"zero": 0, "one": 1, "two": 2}) - - def test_from_pipeline_input_differ(self): - """Run pipeline but input definition in registry differs.""" - # This config declares that the pipeline takes a TaskMetadata - # as input but registry already thinks it has a StructureDataDict. - executor = self._configure_pipeline(NoDimensionsTestConfig2, NoDimensionsTestTask.ConfigClass) - - with self.assertLogs("lsst", level="INFO") as cm: - quanta = executor.run(register_dataset_types=True, save_versions=False) - self._test_logs(cm.output, "lsst.pipe.base.TaskMetadata", "dict", "dict", "dict") - + pipeline_graph = PipelineGraph() + pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) + pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b) + executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler) + quanta = executor.run(register_dataset_types=True, save_versions=False) self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) - self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) - - def test_from_pipeline_incompatible(self): - """Test that we cannot make a QG if the registry and pipeline have - incompatible storage classes for a dataset type. + # Both quanta ran successfully (NoWorkFound is a success). + self.assertTrue(self.butler.exists("a_metadata")) + self.assertTrue(self.butler.exists("b_metadata")) + # The intermediate dataset was not written, but the final output was. + self.assertFalse(self.butler.exists("intermediate")) + self.assertTrue(self.butler.exists("output")) + + def test_optional_input(self): + """Test a pipeline task with an optional regular input that is an + overall input to the pipeline. """ - # Incompatible output dataset type. - self.butler.registry.registerDatasetType( - lsst.daf.butler.DatasetType( - "output", - dimensions=self.butler.dimensions.empty, - storageClass="StructuredDataList", - ) + config_a = DynamicTestPipelineTaskConfig() + config_a.inputs["i1"] = DynamicConnectionConfig( + dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False ) - with self.assertRaisesRegex( - IncompatibleDatasetTypeError, "Incompatible definition.*StructuredDataDict.*StructuredDataList.*" - ): - self._configure_pipeline(NoDimensionsTestTask.ConfigClass, NoDimensionsTestTask.ConfigClass) - - def test_from_pipeline_registry_changed(self): - """Run pipeline, but change registry dataset types between making the - QG and executing it. - - This only fails with full-butler execution; we don't have a way to - prevent it with QBB. - """ - executor = self._configure_pipeline( - NoDimensionsTestTask.ConfigClass, NoDimensionsTestTask.ConfigClass + config_a.outputs["i2"] = DynamicConnectionConfig( + dataset_type_name="input_2", + storage_class="StructuredDataDict", # will never exist ) - self.butler.registry.registerDatasetType( - lsst.daf.butler.DatasetType( - "output", - dimensions=self.butler.dimensions.empty, - storageClass="TaskMetadataLike", # even compatible is not okay - ) + config_a.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="output", storage_class="StructuredDataDict" ) - with self.assertRaisesRegex( - lsst.daf.butler.registry.ConflictingDefinitionError, - ".*definition in registry has changed.*StructuredDataDict.*TaskMetadataLike.*", - ): - executor.run(register_dataset_types=True, save_versions=False) - - def test_from_pipeline_metadata(self): - """Test two tasks where the output uses metadata from input.""" - # Must configure a special pipeline for this test. - config_a = NoDimensionsTestTask.ConfigClass() - config_a.connections.output = "intermediate" - config_b = NoDimensionsMetadataTestTask.ConfigClass() - config_b.connections.input = "intermediate" - config_b.key = "two" - config_b.value = 2 pipeline_graph = PipelineGraph() - pipeline_graph.add_task("a", NoDimensionsTestTask, config=config_a) - pipeline_graph.add_task("b", NoDimensionsMetadataTestTask, config=config_b) + pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler) - - with self.assertLogs("test_simple_pipeline_executor", level="INFO") as cm: - quanta = executor.run(register_dataset_types=True, save_versions=False) - self.assertIn(f"Received task metadata ({get_full_type_name(dict)})", "".join(cm.output)) - - self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) - self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 1) + # The quanta ran successfully. + self.assertTrue(self.butler.exists("a_metadata")) + # The final output was written. + self.assertTrue(self.butler.exists("output")) def test_from_pipeline_file(self): """Test executing a two quanta from different configurations of the @@ -345,22 +190,39 @@ def test_from_pipeline_file(self): description: test tasks: a: - class: "lsst.pipe.base.tests.no_dimensions.NoDimensionsTestTask" + class: "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask" config: - connections.output: "intermediate" + python: | + from lsst.pipe.base.tests.mocks import DynamicConnectionConfig + config.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="input", + storage_class="StructuredDataDict", + mock_storage_class=False, + ) + config.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="intermediate", + storage_class="StructuredDataDict", + ) b: - class: "lsst.pipe.base.tests.no_dimensions.NoDimensionsTestTask" + class: "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask" config: - connections.input: "intermediate" - key: "two" - value: 2 + python: | + from lsst.pipe.base.tests.mocks import DynamicConnectionConfig + config.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="intermediate", + storage_class="StructuredDataDict", + ) + config.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="output", + storage_class="StructuredDataDict", + ) """ ) executor = SimplePipelineExecutor.from_pipeline_filename(filename, butler=self.butler) quanta = executor.run(register_dataset_types=True, save_versions=False) self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) - self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) + self.assertEqual(self.butler.get("intermediate").storage_class, get_mock_name("StructuredDataDict")) + self.assertEqual(self.butler.get("output").storage_class, get_mock_name("StructuredDataDict")) class MemoryTester(lsst.utils.tests.MemoryTestCase):