From 60f68d00b34b2a1689a01d1f1db2236079c12d11 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Thu, 20 Jul 2023 21:20:41 -0400 Subject: [PATCH] Mark PipelineGraph serialization as experimental. --- .../base/pipeline_graph/_pipeline_graph.py | 39 +++++++++++++++---- python/lsst/pipe/base/pipeline_graph/io.py | 2 +- tests/test_pipeline_graph.py | 28 ++++++------- 3 files changed, 47 insertions(+), 22 deletions(-) diff --git a/python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py b/python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py index 6e29363d..3b1e1671 100644 --- a/python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py +++ b/python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py @@ -992,10 +992,15 @@ def make_dataset_type_xgraph(self, init: bool = False) -> networkx.DiGraph: # # Serialization Interface. # + # Serialization of PipelineGraphs is currently experimental and may not be + # retained in the future. All serialization methods are + # underscore-prefixed to ensure nobody mistakes them for a stable interface + # (let a lone a stable file format). + # ########################################################################### @classmethod - def read_stream( + def _read_stream( cls, stream: BinaryIO, import_and_configure: bool = True, @@ -1031,6 +1036,12 @@ def read_stream( EdgesChangedError Raised if ``check_edges_unchanged=True`` and the edges of a task do change after import and reconfiguration. + + Notes + ----- + `PipelineGraph` serialization is currently experimental and may be + removed or significantly changed in the future, with no deprecation + period. """ from .io import SerializedPipelineGraph @@ -1044,7 +1055,7 @@ def read_stream( ) @classmethod - def read_uri( + def _read_uri( cls, uri: ResourcePathExpression, import_and_configure: bool = True, @@ -1080,17 +1091,23 @@ def read_uri( EdgesChangedError Raised if ``check_edges_unchanged=True`` and the edges of a task do change after import and reconfiguration. + + Notes + ----- + `PipelineGraph` serialization is currently experimental and may be + removed or significantly changed in the future, with no deprecation + period. """ uri = ResourcePath(uri) with uri.open("rb") as stream: - return cls.read_stream( + return cls._read_stream( cast(BinaryIO, stream), import_and_configure=import_and_configure, check_edges_unchanged=check_edges_unchanged, assume_edges_unchanged=assume_edges_unchanged, ) - def write_stream(self, stream: BinaryIO) -> None: + def _write_stream(self, stream: BinaryIO) -> None: """Write the pipeline to a file-like object. Parameters @@ -1100,6 +1117,10 @@ def write_stream(self, stream: BinaryIO) -> None: Notes ----- + `PipelineGraph` serialization is currently experimental and may be + removed or significantly changed in the future, with no deprecation + period. + The file format is gzipped JSON, and is intended to be human-readable, but it should not be considered a stable public interface for outside code, which should always use `PipelineGraph` methods (or at least the @@ -1112,7 +1133,7 @@ def write_stream(self, stream: BinaryIO) -> None: SerializedPipelineGraph.serialize(self).json(exclude_defaults=True, indent=2).encode("utf-8") ) - def write_uri(self, uri: ResourcePathExpression) -> None: + def _write_uri(self, uri: ResourcePathExpression) -> None: """Write the pipeline to a file given a URI. Parameters @@ -1123,6 +1144,10 @@ def write_uri(self, uri: ResourcePathExpression) -> None: Notes ----- + `PipelineGraph` serialization is currently experimental and may be + removed or significantly changed in the future, with no deprecation + period. + The file format is gzipped JSON, and is intended to be human-readable, but it should not be considered a stable public interface for outside code, which should always use `PipelineGraph` methods (or at least the @@ -1135,9 +1160,9 @@ def write_uri(self, uri: ResourcePathExpression) -> None: elif extension != ".json.gz": raise ValueError("Expanded pipeline files should always have a .json.gz extension.") with uri.open(mode="wb") as stream: - self.write_stream(cast(BinaryIO, stream)) + self._write_stream(cast(BinaryIO, stream)) - def import_and_configure( + def _import_and_configure( self, check_edges_unchanged: bool = False, assume_edges_unchanged: bool = False ) -> None: """Import the `PipelineTask` classes referenced by all task nodes and diff --git a/python/lsst/pipe/base/pipeline_graph/io.py b/python/lsst/pipe/base/pipeline_graph/io.py index 5b62c715..33c9ef85 100644 --- a/python/lsst/pipe/base/pipeline_graph/io.py +++ b/python/lsst/pipe/base/pipeline_graph/io.py @@ -615,7 +615,7 @@ def deserialize( data_id=self.data_id, ) if import_and_configure: - result.import_and_configure( + result._import_and_configure( check_edges_unchanged=check_edges_unchanged, assume_edges_unchanged=assume_edges_unchanged, ) diff --git a/tests/test_pipeline_graph.py b/tests/test_pipeline_graph.py index dd8718e1..479e9202 100644 --- a/tests/test_pipeline_graph.py +++ b/tests/test_pipeline_graph.py @@ -133,9 +133,9 @@ def test_unresolved_stream_io(self) -> None: serialization. """ stream = io.BytesIO() - self.graph.write_stream(stream) + self.graph._write_stream(stream) stream.seek(0) - roundtripped = PipelineGraph.read_stream(stream) + roundtripped = PipelineGraph._read_stream(stream) self.check_make_xgraph(roundtripped, resolved=False) def test_unresolved_file_io(self) -> None: @@ -143,8 +143,8 @@ def test_unresolved_file_io(self) -> None: serialization. """ with lsst.utils.tests.getTempFilePath(".json.gz") as filename: - self.graph.write_uri(filename) - roundtripped = PipelineGraph.read_uri(filename) + self.graph._write_uri(filename) + roundtripped = PipelineGraph._read_uri(filename) self.check_make_xgraph(roundtripped, resolved=False) def test_unresolved_deferred_import_io(self) -> None: @@ -152,14 +152,14 @@ def test_unresolved_deferred_import_io(self) -> None: serialization, without immediately importing tasks on read. """ stream = io.BytesIO() - self.graph.write_stream(stream) + self.graph._write_stream(stream) stream.seek(0) - roundtripped = PipelineGraph.read_stream(stream, import_and_configure=False) + roundtripped = PipelineGraph._read_stream(stream, import_and_configure=False) self.check_make_xgraph(roundtripped, resolved=False, imported_and_configured=False) # Check that we can still resolve the graph without importing tasks. roundtripped.resolve(MockRegistry(self.dimensions, {})) self.check_make_xgraph(roundtripped, resolved=True, imported_and_configured=False) - roundtripped.import_and_configure(assume_edges_unchanged=True) + roundtripped._import_and_configure(assume_edges_unchanged=True) self.check_make_xgraph(roundtripped, resolved=True, imported_and_configured=True) def test_resolved_accessors(self) -> None: @@ -198,9 +198,9 @@ def test_resolved_stream_io(self) -> None: """ self.graph.resolve(MockRegistry(self.dimensions, {})) stream = io.BytesIO() - self.graph.write_stream(stream) + self.graph._write_stream(stream) stream.seek(0) - roundtripped = PipelineGraph.read_stream(stream) + roundtripped = PipelineGraph._read_stream(stream) self.check_make_xgraph(roundtripped, resolved=True) def test_resolved_file_io(self) -> None: @@ -209,8 +209,8 @@ def test_resolved_file_io(self) -> None: """ self.graph.resolve(MockRegistry(self.dimensions, {})) with lsst.utils.tests.getTempFilePath(".json.gz") as filename: - self.graph.write_uri(filename) - roundtripped = PipelineGraph.read_uri(filename) + self.graph._write_uri(filename) + roundtripped = PipelineGraph._read_uri(filename) self.check_make_xgraph(roundtripped, resolved=True) def test_resolved_deferred_import_io(self) -> None: @@ -219,11 +219,11 @@ def test_resolved_deferred_import_io(self) -> None: """ self.graph.resolve(MockRegistry(self.dimensions, {})) stream = io.BytesIO() - self.graph.write_stream(stream) + self.graph._write_stream(stream) stream.seek(0) - roundtripped = PipelineGraph.read_stream(stream, import_and_configure=False) + roundtripped = PipelineGraph._read_stream(stream, import_and_configure=False) self.check_make_xgraph(roundtripped, resolved=True, imported_and_configured=False) - roundtripped.import_and_configure(check_edges_unchanged=True) + roundtripped._import_and_configure(check_edges_unchanged=True) self.check_make_xgraph(roundtripped, resolved=True, imported_and_configured=True) def test_unresolved_copies(self) -> None: