From 6ede54134e796337a44130e6d6fb4714833d9faf Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Tue, 30 Jul 2024 14:27:33 -0400 Subject: [PATCH 1/3] Rewrite SimplePipelineExecutor tests using new test utility code. Tests for storage class conversion are now in their own file. All tests have been switched from NoDimensionTestTask to the much more flexible DynamicTestPipelineTask, which also writes MockDataset objects that let us inspect what was going on inside the task without needing to search the logs. --- ...test_execution_storage_class_conversion.py | 317 +++++++++++++++++ tests/test_simple_pipeline_executor.py | 328 ++++-------------- 2 files changed, 383 insertions(+), 262 deletions(-) create mode 100644 tests/test_execution_storage_class_conversion.py diff --git a/tests/test_execution_storage_class_conversion.py b/tests/test_execution_storage_class_conversion.py new file mode 100644 index 00000000..e90f0fb4 --- /dev/null +++ b/tests/test_execution_storage_class_conversion.py @@ -0,0 +1,317 @@ +# This file is part of ctrl_mpexec. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +import os +import shutil +import tempfile +import unittest + +import lsst.daf.butler +import lsst.utils.tests +from lsst.ctrl.mpexec import SimplePipelineExecutor +from lsst.pipe.base import PipelineGraph +from lsst.pipe.base.pipeline_graph import IncompatibleDatasetTypeError +from lsst.pipe.base.tests.mocks import ( + DynamicConnectionConfig, + DynamicTestPipelineTask, + DynamicTestPipelineTaskConfig, + MockStorageClass, + get_mock_name, +) + +TESTDIR = os.path.abspath(os.path.dirname(__file__)) + + +class TestExecutionStorageClassConversion(lsst.utils.tests.TestCase): + """Test storage class conversions during execution. + + Task connection declarations should always define which storage class they + see, while data repository registrations should always define what is + stored. + + This test uses mock storage classes for intermediate and output datasets, + which let us load the dataset to see what storage class the task saw when + it was running. These storage class names need to be wrapped in + get_mock_name calls to get what the butler actually sees. Overall input + datasets are not declared with mock datasets, so we can `put` them directly + in test code. + """ + + def setUp(self): + self.path = tempfile.mkdtemp() + # standalone parameter forces the returned config to also include + # the information from the search paths. + config = lsst.daf.butler.Butler.makeRepo( + self.path, standalone=True, searchPaths=[os.path.join(TESTDIR, "config")] + ) + self.butler = SimplePipelineExecutor.prep_butler(config, [], "fake") + self.butler.registry.registerDatasetType( + lsst.daf.butler.DatasetType( + "input", + dimensions=self.butler.dimensions.empty, + storageClass="StructuredDataDict", + ) + ) + self.butler.put({"zero": 0}, "input") + MockStorageClass.get_or_register_mock("StructuredDataDict") + MockStorageClass.get_or_register_mock("TaskMetadataLike") + + def tearDown(self): + shutil.rmtree(self.path, ignore_errors=True) + + def _make_config( + self, + input_storage_class="StructuredDataDict", + output_storage_class="StructuredDataDict", + input_name="input", + output_name="output", + ): + """Create configuration for a test task with a single input and single + output of the given storage classes and dataset type names. + """ + config = DynamicTestPipelineTaskConfig() + config.inputs["i"] = DynamicConnectionConfig( + dataset_type_name=input_name, + storage_class=input_storage_class, + # Since the overall input is special, we only use a mock storage + # class for it when there's a storage class conversion. + mock_storage_class=(input_name != "input" or (input_storage_class != "StructuredDataDict")), + ) + config.outputs["o"] = DynamicConnectionConfig( + dataset_type_name=output_name, storage_class=output_storage_class + ) + return config + + def _make_executor( + self, + a_i_storage_class="StructuredDataDict", + a_o_storage_class="StructuredDataDict", + b_i_storage_class="StructuredDataDict", + b_o_storage_class="StructuredDataDict", + ): + """Configure a SimplePipelineExecutor with tasks with the given + storage classes as inputs and outputs. + + This sets up a simple pipeline with two tasks ('a' and 'b') where the + second task's only input is the first task's only output. + """ + config_a = self._make_config(a_i_storage_class, a_o_storage_class, output_name="intermediate") + config_b = self._make_config(b_i_storage_class, b_o_storage_class, input_name="intermediate") + pipeline_graph = PipelineGraph() + pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) + pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b) + executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler) + return executor + + def _assert_datasets( + self, + a_i_storage_class="StructuredDataDict", + a_o_storage_class="StructuredDataDict", + b_i_storage_class="StructuredDataDict", + b_o_storage_class="StructuredDataDict", + stored_intermediate_storage_class="StructuredDataDict", + stored_output_storage_class="StructuredDataDict", + ): + """Check that a butler repository's contents are consistent with + running a pipeline created by _make_executor. + """ + # Read input and output datasets from butler, inspect their storage + # classes directly. + stored_intermediate = self.butler.get("intermediate") + stored_output = self.butler.get("output") + self.assertEqual( + self.butler.get_dataset_type("intermediate").storageClass_name, + get_mock_name(stored_intermediate_storage_class), + ) + self.assertEqual(stored_output.storage_class, get_mock_name(stored_output_storage_class)) + self.assertEqual( + self.butler.get_dataset_type("output").storageClass_name, + get_mock_name(stored_output_storage_class), + ) + # Since we didn't tell the butler to convert storage classes on read, + # they'll remember their last conversion (on write). + if a_o_storage_class != stored_intermediate_storage_class: + self.assertEqual( + stored_intermediate.converted_from.storage_class, + get_mock_name(a_o_storage_class), + ) + else: + self.assertIsNone(stored_intermediate.converted_from) + if b_o_storage_class != stored_output_storage_class: + self.assertEqual( + stored_output.converted_from.storage_class, + get_mock_name(b_o_storage_class), + ) + else: + self.assertIsNone(stored_output.converted_from) + # Extract the inputs as seen by the tasks from those stored outputs. + quantum_a = stored_intermediate.quantum + quantum_b = stored_output.quantum + b_input = quantum_b.inputs["i"][0] + a_input = quantum_a.inputs["i"][0] + if a_i_storage_class == "StructuredDataDict": + self.assertIsNone(a_input.converted_from, None) + else: + self.assertEqual(a_input.original_type, "dict") + self.assertEqual(b_input.storage_class, get_mock_name(b_i_storage_class)) + + def test_no_conversions(self): + """Test execution with no storage class conversions as a baseline.""" + executor = self._make_executor() + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 2) + self._assert_datasets() + + def test_intermediate_registration_differs(self): + """Test execution where an intermediate is registered to be different + from both the producing and consuming task. + """ + self.butler.registry.registerDatasetType( + lsst.daf.butler.DatasetType( + "intermediate", + dimensions=self.butler.dimensions.empty, + storageClass=get_mock_name("TaskMetadataLike"), + ) + ) + executor = self._make_executor() + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 2) + self._assert_datasets(stored_intermediate_storage_class="TaskMetadataLike") + + def test_intermediate_producer_differs(self): + """Test execution where an intermediate is registered to be consistent + with the consumer but different from its producer. + """ + self.butler.registry.registerDatasetType( + lsst.daf.butler.DatasetType( + "intermediate", + dimensions=self.butler.dimensions.empty, + storageClass=get_mock_name("TaskMetadataLike"), + ) + ) + executor = self._make_executor(b_i_storage_class="TaskMetadataLike") + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 2) + self._assert_datasets( + stored_intermediate_storage_class="TaskMetadataLike", b_i_storage_class="TaskMetadataLike" + ) + + def test_intermediate_consumer_differs(self): + """Test execution where an intermediate is registered to be consistent + with its producer but different from its consumer. + """ + executor = self._make_executor(a_o_storage_class="TaskMetadataLike") + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 2) + self._assert_datasets( + stored_intermediate_storage_class="TaskMetadataLike", a_o_storage_class="TaskMetadataLike" + ) + + def test_output_differs(self): + """Test execution where an overall output is registered to be + different from the producing task. + """ + self.butler.registry.registerDatasetType( + lsst.daf.butler.DatasetType( + "output", + dimensions=self.butler.dimensions.empty, + storageClass=get_mock_name("TaskMetadataLike"), + ) + ) + executor = self._make_executor() + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 2) + self._assert_datasets(stored_output_storage_class="TaskMetadataLike") + + def test_input_differs(self): + """Test execution where an overall input's storage class is different + from the consuming task. + """ + executor = self._make_executor(a_i_storage_class="TaskMetadataLike") + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 2) + self._assert_datasets(a_i_storage_class="TaskMetadataLike") + + def test_incompatible(self): + """Test that we cannot make a QG if the registry and pipeline have + incompatible storage classes for a dataset type. + """ + # Incompatible output dataset type. + self.butler.registry.registerDatasetType( + lsst.daf.butler.DatasetType( + "output", + dimensions=self.butler.dimensions.empty, + storageClass="StructuredDataList", + ) + ) + with self.assertRaisesRegex( + IncompatibleDatasetTypeError, "Incompatible definition.*StructuredDataDict.*StructuredDataList.*" + ): + self._make_executor() + + def test_registry_changed(self): + """Run pipeline, but change registry dataset types between making the + QG and executing it. + + This only fails with full-butler execution; we don't have a way to + prevent it with QBB. + """ + executor = self._make_executor() + self.butler.registry.registerDatasetType( + lsst.daf.butler.DatasetType( + "output", + dimensions=self.butler.dimensions.empty, + storageClass="TaskMetadataLike", # even compatible is not okay + ) + ) + with self.assertRaisesRegex( + lsst.daf.butler.registry.ConflictingDefinitionError, + ".*definition in registry has changed.*StructuredDataDict.*TaskMetadataLike.*", + ): + executor.run(register_dataset_types=True, save_versions=False) + + +class MemoryTester(lsst.utils.tests.MemoryTestCase): + """Generic tests for file leaks.""" + + +def setup_module(module): + """Set up the module for pytest. + + Parameters + ---------- + module : `~types.ModuleType` + Module to set up. + """ + lsst.utils.tests.init() + + +if __name__ == "__main__": + lsst.utils.tests.init() + unittest.main() diff --git a/tests/test_simple_pipeline_executor.py b/tests/test_simple_pipeline_executor.py index be4ac93b..d32059d6 100644 --- a/tests/test_simple_pipeline_executor.py +++ b/tests/test_simple_pipeline_executor.py @@ -31,94 +31,30 @@ import shutil import tempfile import unittest -from typing import Any import lsst.daf.butler import lsst.utils.tests from lsst.ctrl.mpexec import SimplePipelineExecutor -from lsst.pipe.base import PipelineGraph, Struct, TaskMetadata, connectionTypes -from lsst.pipe.base.pipeline_graph import IncompatibleDatasetTypeError -from lsst.pipe.base.tests.no_dimensions import ( - NoDimensionsTestConfig, - NoDimensionsTestConnections, - NoDimensionsTestTask, +from lsst.pipe.base import PipelineGraph +from lsst.pipe.base.tests.mocks import ( + DynamicConnectionConfig, + DynamicTestPipelineTask, + DynamicTestPipelineTaskConfig, + MockStorageClass, + get_mock_name, ) -from lsst.utils.introspection import get_full_type_name TESTDIR = os.path.abspath(os.path.dirname(__file__)) -class NoDimensionsTestConnections2(NoDimensionsTestConnections, dimensions=set()): - """A connections class used for testing.""" - - input = connectionTypes.Input( - name="input", doc="some dict-y input data for testing", storageClass="TaskMetadataLike" - ) - - -class NoDimensionsTestConfig2(NoDimensionsTestConfig, pipelineConnections=NoDimensionsTestConnections2): - """A config used for testing.""" - - -class NoDimensionsMetadataTestConnections(NoDimensionsTestConnections, dimensions=set()): - """Test connection class for metadata. +class SimplePipelineExecutorTests(lsst.utils.tests.TestCase): + """Test the SimplePipelineExecutor API. - Deliberately choose a storage class that does not match the metadata - default TaskMetadata storage class. + Because SimplePipelineExecutor is the easiest way to run simple pipelines + in tests, this has also become a home for tests of execution edge cases + that don't have a clear home in other test files. """ - meta = connectionTypes.Input( - name="a_metadata", doc="Metadata from previous task", storageClass="StructuredDataDict" - ) - - -class NoDimensionsMetadataTestConfig( - NoDimensionsTestConfig, pipelineConnections=NoDimensionsMetadataTestConnections -): - """A config used for testing the metadata.""" - - -class NoDimensionsMetadataTestTask(NoDimensionsTestTask): - """A simple pipeline task that can take a metadata as input.""" - - ConfigClass = NoDimensionsMetadataTestConfig - _DefaultName = "noDimensionsMetadataTest" - - def run(self, input: dict[str, int], meta: dict[str, Any]) -> Struct: - """Run the task, adding the configured key-value pair to the input - argument and returning it as the output. - - Parameters - ---------- - input : `dict` - Dictionary to update and return. - meta : `dict` - Metadata to add. - - Returns - ------- - result : `lsst.pipe.base.Struct` - Struct with a single ``output`` attribute. - """ - self.log.info("Run metadata method given data of type: %s", get_full_type_name(input)) - output = input.copy() - output[self.config.key] = self.config.value - - self.log.info("Received task metadata (%s): %s", get_full_type_name(meta), meta) - - # Can change the return type via configuration. - if "TaskMetadata" in self.config.outputSC: - output = TaskMetadata.from_dict(output) - elif type(output) is TaskMetadata: - # Want the output to be a dict - output = output.to_dict() - self.log.info("Run method returns data of type: %s", get_full_type_name(output)) - return Struct(output=output) - - -class SimplePipelineExecutorTests(lsst.utils.tests.TestCase): - """Test the SimplePipelineExecutor API with a trivial task.""" - def setUp(self): self.path = tempfile.mkdtemp() # standalone parameter forces the returned config to also include @@ -135,6 +71,8 @@ def setUp(self): ) ) self.butler.put({"zero": 0}, "input") + MockStorageClass.get_or_register_mock("StructuredDataDict") + MockStorageClass.get_or_register_mock("TaskMetadataLike") def tearDown(self): shutil.rmtree(self.path, ignore_errors=True) @@ -144,194 +82,43 @@ def test_from_task_class(self): `from_task_class` factory method, and the `SimplePipelineExecutor.as_generator` method. """ - executor = SimplePipelineExecutor.from_task_class(NoDimensionsTestTask, butler=self.butler) - (quantum,) = executor.as_generator(register_dataset_types=True) - self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1}) - - def _configure_pipeline(self, config_a_cls, config_b_cls, storageClass_a=None, storageClass_b=None): - """Configure a pipeline with from_pipeline_graph.""" - config_a = config_a_cls() - config_a.connections.output = "intermediate" - if storageClass_a: - config_a.outputSC = storageClass_a - config_b = config_b_cls() - config_b.connections.input = "intermediate" - if storageClass_b: - config_b.outputSC = storageClass_b - config_b.key = "two" - config_b.value = 2 - pipeline_graph = PipelineGraph() - pipeline_graph.add_task("a", NoDimensionsTestTask, config_a) - pipeline_graph.add_task("b", NoDimensionsTestTask, config_b) - executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler) - return executor - - def _test_logs(self, log_output, input_type_a, output_type_a, input_type_b, output_type_b): - """Check the expected input types received by tasks A and B. - - Note that these are the types as seen from the perspective of the task, - so they must be consistent with the task's connections, but may not be - consistent with the registry dataset types. - """ - all_logs = "\n".join(log_output) - self.assertIn(f"lsst.a:Run method given data of type: {input_type_a}", all_logs) - self.assertIn(f"lsst.b:Run method given data of type: {input_type_b}", all_logs) - self.assertIn(f"lsst.a:Run method returns data of type: {output_type_a}", all_logs) - self.assertIn(f"lsst.b:Run method returns data of type: {output_type_b}", all_logs) - - def test_from_pipeline(self): - """Test executing a two quanta from different configurations of the - same task, with an executor created by the `from_pipeline` factory - method, and the `SimplePipelineExecutor.run` method. - """ - executor = self._configure_pipeline( - NoDimensionsTestTask.ConfigClass, NoDimensionsTestTask.ConfigClass + config = DynamicTestPipelineTaskConfig() + config.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False ) - - with self.assertLogs("lsst", level="INFO") as cm: - quanta = executor.run(register_dataset_types=True, save_versions=False) - self._test_logs(cm.output, "dict", "dict", "dict", "dict") - - self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) - self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) - - def test_from_pipeline_intermediates_differ(self): - """Run pipeline but intermediates definition in registry differs.""" - # Pre-define the "intermediate" storage class to be something that is - # like a dict but is not a dict. This will fail unless storage - # class conversion is supported in put and get. - self.butler.registry.registerDatasetType( - lsst.daf.butler.DatasetType( - "intermediate", - dimensions=self.butler.dimensions.empty, - storageClass="TaskMetadataLike", - ) - ) - executor = self._configure_pipeline( - NoDimensionsTestTask.ConfigClass, - NoDimensionsTestTask.ConfigClass, - storageClass_b="TaskMetadataLike", - ) - with self.assertLogs("lsst", level="INFO") as cm: - quanta = executor.run(register_dataset_types=True, save_versions=False) - # A dict is given to task a without change. - # A returns a dict because it has not been told to do anything else. - # That does not match the storage class so it will be converted - # on put. - # b is given a dict, because that's what its connection asks for. - # b returns a TaskMetadata because that's how we configured it, and - # since its output wasn't registered in advance, it will have been - # registered as TaskMetadata and will now be received as TaskMetadata. - self._test_logs(cm.output, "dict", "dict", "dict", "lsst.pipe.base.TaskMetadata") - - self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate"), TaskMetadata.from_dict({"zero": 0, "one": 1})) - self.assertEqual(self.butler.get("output"), TaskMetadata.from_dict({"zero": 0, "one": 1, "two": 2})) - - def test_from_pipeline_output_differ(self): - """Run pipeline but output definition in registry differs.""" - # Pre-define the "output" storage class to be something that is - # like a dict but is not a dict. This will fail unless storage - # class conversion is supported in put and get. - self.butler.registry.registerDatasetType( - lsst.daf.butler.DatasetType( - "output", - dimensions=self.butler.dimensions.empty, - storageClass="TaskMetadataLike", - ) + config.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="output", storage_class="StructuredDataDict" ) - executor = self._configure_pipeline( - NoDimensionsTestTask.ConfigClass, - NoDimensionsTestTask.ConfigClass, - storageClass_a="TaskMetadataLike", + executor = SimplePipelineExecutor.from_task_class( + DynamicTestPipelineTask, + config=config, + butler=self.butler, + label="a", ) - with self.assertLogs("lsst", level="INFO") as cm: - quanta = executor.run(register_dataset_types=True, save_versions=False) - # a has been told to return a TaskMetadata but this will convert to - # dict on read by b. - # b returns a dict and that is converted to TaskMetadata on put. - self._test_logs(cm.output, "dict", "lsst.pipe.base.TaskMetadata", "dict", "dict") - - self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate").to_dict(), {"zero": 0, "one": 1}) - self.assertEqual(self.butler.get("output").to_dict(), {"zero": 0, "one": 1, "two": 2}) - - def test_from_pipeline_input_differ(self): - """Run pipeline but input definition in registry differs.""" - # This config declares that the pipeline takes a TaskMetadata - # as input but registry already thinks it has a StructureDataDict. - executor = self._configure_pipeline(NoDimensionsTestConfig2, NoDimensionsTestTask.ConfigClass) - - with self.assertLogs("lsst", level="INFO") as cm: - quanta = executor.run(register_dataset_types=True, save_versions=False) - self._test_logs(cm.output, "lsst.pipe.base.TaskMetadata", "dict", "dict", "dict") - - self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) - self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) + (quantum,) = executor.as_generator(register_dataset_types=True) + self.assertEqual(self.butler.get("output").storage_class, get_mock_name("StructuredDataDict")) - def test_from_pipeline_incompatible(self): - """Test that we cannot make a QG if the registry and pipeline have - incompatible storage classes for a dataset type. - """ - # Incompatible output dataset type. - self.butler.registry.registerDatasetType( - lsst.daf.butler.DatasetType( - "output", - dimensions=self.butler.dimensions.empty, - storageClass="StructuredDataList", - ) + def test_metadata_input(self): + """Test two tasks where the output uses metadata from input.""" + config_a = DynamicTestPipelineTaskConfig() + config_a.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False ) - with self.assertRaisesRegex( - IncompatibleDatasetTypeError, "Incompatible definition.*StructuredDataDict.*StructuredDataList.*" - ): - self._configure_pipeline(NoDimensionsTestTask.ConfigClass, NoDimensionsTestTask.ConfigClass) - - def test_from_pipeline_registry_changed(self): - """Run pipeline, but change registry dataset types between making the - QG and executing it. - - This only fails with full-butler execution; we don't have a way to - prevent it with QBB. - """ - executor = self._configure_pipeline( - NoDimensionsTestTask.ConfigClass, NoDimensionsTestTask.ConfigClass + config_b = DynamicTestPipelineTaskConfig() + config_b.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="output", storage_class="StructuredDataDict" ) - self.butler.registry.registerDatasetType( - lsst.daf.butler.DatasetType( - "output", - dimensions=self.butler.dimensions.empty, - storageClass="TaskMetadataLike", # even compatible is not okay - ) + config_b.inputs["in_metadata"] = DynamicConnectionConfig( + dataset_type_name="a_metadata", storage_class="TaskMetadata" ) - with self.assertRaisesRegex( - lsst.daf.butler.registry.ConflictingDefinitionError, - ".*definition in registry has changed.*StructuredDataDict.*TaskMetadataLike.*", - ): - executor.run(register_dataset_types=True, save_versions=False) - - def test_from_pipeline_metadata(self): - """Test two tasks where the output uses metadata from input.""" - # Must configure a special pipeline for this test. - config_a = NoDimensionsTestTask.ConfigClass() - config_a.connections.output = "intermediate" - config_b = NoDimensionsMetadataTestTask.ConfigClass() - config_b.connections.input = "intermediate" - config_b.key = "two" - config_b.value = 2 pipeline_graph = PipelineGraph() - pipeline_graph.add_task("a", NoDimensionsTestTask, config=config_a) - pipeline_graph.add_task("b", NoDimensionsMetadataTestTask, config=config_b) + pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) + pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b) executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler) - - with self.assertLogs("test_simple_pipeline_executor", level="INFO") as cm: - quanta = executor.run(register_dataset_types=True, save_versions=False) - self.assertIn(f"Received task metadata ({get_full_type_name(dict)})", "".join(cm.output)) - + quanta = executor.run(register_dataset_types=True, save_versions=False) self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) - self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) + output = self.butler.get("output") + self.assertEqual(output.quantum.inputs["in_metadata"][0].original_type, "lsst.pipe.base.TaskMetadata") def test_from_pipeline_file(self): """Test executing a two quanta from different configurations of the @@ -345,22 +132,39 @@ def test_from_pipeline_file(self): description: test tasks: a: - class: "lsst.pipe.base.tests.no_dimensions.NoDimensionsTestTask" + class: "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask" config: - connections.output: "intermediate" + python: | + from lsst.pipe.base.tests.mocks import DynamicConnectionConfig + config.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="input", + storage_class="StructuredDataDict", + mock_storage_class=False, + ) + config.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="intermediate", + storage_class="StructuredDataDict", + ) b: - class: "lsst.pipe.base.tests.no_dimensions.NoDimensionsTestTask" + class: "lsst.pipe.base.tests.mocks.DynamicTestPipelineTask" config: - connections.input: "intermediate" - key: "two" - value: 2 + python: | + from lsst.pipe.base.tests.mocks import DynamicConnectionConfig + config.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="intermediate", + storage_class="StructuredDataDict", + ) + config.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="output", + storage_class="StructuredDataDict", + ) """ ) executor = SimplePipelineExecutor.from_pipeline_filename(filename, butler=self.butler) quanta = executor.run(register_dataset_types=True, save_versions=False) self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) - self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) + self.assertEqual(self.butler.get("intermediate").storage_class, get_mock_name("StructuredDataDict")) + self.assertEqual(self.butler.get("output").storage_class, get_mock_name("StructuredDataDict")) class MemoryTester(lsst.utils.tests.MemoryTestCase): From 99b3c7ed9483bdeb34a6ff71f4ea3ddd974a1ca1 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Tue, 30 Jul 2024 16:02:02 -0400 Subject: [PATCH 2/3] Add unit tests for optional Input connections. --- tests/test_simple_pipeline_executor.py | 58 ++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/tests/test_simple_pipeline_executor.py b/tests/test_simple_pipeline_executor.py index d32059d6..95842c1e 100644 --- a/tests/test_simple_pipeline_executor.py +++ b/tests/test_simple_pipeline_executor.py @@ -120,6 +120,64 @@ def test_metadata_input(self): output = self.butler.get("output") self.assertEqual(output.quantum.inputs["in_metadata"][0].original_type, "lsst.pipe.base.TaskMetadata") + def test_optional_intermediate(self): + """Test a pipeline task with an optional regular input that is produced + by another task. + """ + config_a = DynamicTestPipelineTaskConfig() + config_a.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False + ) + config_a.fail_exception = "lsst.pipe.base.NoWorkFound" + config_a.fail_condition = "1=1" # butler query expression that is true + config_a.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="intermediate", storage_class="StructuredDataDict" + ) + config_b = DynamicTestPipelineTaskConfig() + config_b.inputs["i"] = DynamicConnectionConfig( + dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0 + ) + config_b.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="output", storage_class="StructuredDataDict" + ) + pipeline_graph = PipelineGraph() + pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) + pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b) + executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler) + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 2) + # Both quanta ran successfully (NoWorkFound is a success). + self.assertTrue(self.butler.exists("a_metadata")) + self.assertTrue(self.butler.exists("b_metadata")) + # The intermediate dataset was not written, but the final output was. + self.assertFalse(self.butler.exists("intermediate")) + self.assertTrue(self.butler.exists("output")) + + def test_optional_input(self): + """Test a pipeline task with an optional regular input that is an + overall input to the pipeline. + """ + config_a = DynamicTestPipelineTaskConfig() + config_a.inputs["i1"] = DynamicConnectionConfig( + dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False + ) + config_a.outputs["i2"] = DynamicConnectionConfig( + dataset_type_name="input_2", + storage_class="StructuredDataDict", # will never exist + ) + config_a.outputs["o"] = DynamicConnectionConfig( + dataset_type_name="output", storage_class="StructuredDataDict" + ) + pipeline_graph = PipelineGraph() + pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) + executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler) + quanta = executor.run(register_dataset_types=True, save_versions=False) + self.assertEqual(len(quanta), 1) + # The quanta ran successfully. + self.assertTrue(self.butler.exists("a_metadata")) + # The final output was written. + self.assertTrue(self.butler.exists("output")) + def test_from_pipeline_file(self): """Test executing a two quanta from different configurations of the same task, with an executor created by the `from_pipeline_filename` From 14d4facb537f12a3d279fef0d58e805e25969eea Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Wed, 31 Jul 2024 09:59:01 -0400 Subject: [PATCH 3/3] Drop pytest-openfiles from pyproject.toml. It had already been removed from the CI build. --- pyproject.toml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c70793e5..00368e29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,10 +38,7 @@ dynamic = ["version"] "Homepage" = "https://github.com/lsst/ctrl_mpexec" [project.optional-dependencies] -test = [ - "pytest >= 3.2", - "pytest-openfiles >= 0.5.0" -] +test = ["pytest >= 3.2"] [tool.setuptools.packages.find] where = ["python"]