Skip to content

Commit

Permalink
Mark PipelineGraph serialization as experimental.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Jul 24, 2023
1 parent 4ee3e23 commit 60f68d0
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 22 deletions.
39 changes: 32 additions & 7 deletions python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,10 +992,15 @@ def make_dataset_type_xgraph(self, init: bool = False) -> networkx.DiGraph:
#
# Serialization Interface.
#
# Serialization of PipelineGraphs is currently experimental and may not be
# retained in the future. All serialization methods are
# underscore-prefixed to ensure nobody mistakes them for a stable interface
# (let a lone a stable file format).
#
###########################################################################

@classmethod
def read_stream(
def _read_stream(
cls,
stream: BinaryIO,
import_and_configure: bool = True,
Expand Down Expand Up @@ -1031,6 +1036,12 @@ def read_stream(
EdgesChangedError
Raised if ``check_edges_unchanged=True`` and the edges of a task do
change after import and reconfiguration.
Notes
-----
`PipelineGraph` serialization is currently experimental and may be
removed or significantly changed in the future, with no deprecation
period.
"""
from .io import SerializedPipelineGraph

Expand All @@ -1044,7 +1055,7 @@ def read_stream(
)

@classmethod
def read_uri(
def _read_uri(
cls,
uri: ResourcePathExpression,
import_and_configure: bool = True,
Expand Down Expand Up @@ -1080,17 +1091,23 @@ def read_uri(
EdgesChangedError
Raised if ``check_edges_unchanged=True`` and the edges of a task do
change after import and reconfiguration.
Notes
-----
`PipelineGraph` serialization is currently experimental and may be
removed or significantly changed in the future, with no deprecation
period.
"""
uri = ResourcePath(uri)
with uri.open("rb") as stream:
return cls.read_stream(
return cls._read_stream(
cast(BinaryIO, stream),
import_and_configure=import_and_configure,
check_edges_unchanged=check_edges_unchanged,
assume_edges_unchanged=assume_edges_unchanged,
)

def write_stream(self, stream: BinaryIO) -> None:
def _write_stream(self, stream: BinaryIO) -> None:
"""Write the pipeline to a file-like object.
Parameters
Expand All @@ -1100,6 +1117,10 @@ def write_stream(self, stream: BinaryIO) -> None:
Notes
-----
`PipelineGraph` serialization is currently experimental and may be
removed or significantly changed in the future, with no deprecation
period.
The file format is gzipped JSON, and is intended to be human-readable,
but it should not be considered a stable public interface for outside
code, which should always use `PipelineGraph` methods (or at least the
Expand All @@ -1112,7 +1133,7 @@ def write_stream(self, stream: BinaryIO) -> None:
SerializedPipelineGraph.serialize(self).json(exclude_defaults=True, indent=2).encode("utf-8")
)

def write_uri(self, uri: ResourcePathExpression) -> None:
def _write_uri(self, uri: ResourcePathExpression) -> None:
"""Write the pipeline to a file given a URI.
Parameters
Expand All @@ -1123,6 +1144,10 @@ def write_uri(self, uri: ResourcePathExpression) -> None:
Notes
-----
`PipelineGraph` serialization is currently experimental and may be
removed or significantly changed in the future, with no deprecation
period.
The file format is gzipped JSON, and is intended to be human-readable,
but it should not be considered a stable public interface for outside
code, which should always use `PipelineGraph` methods (or at least the
Expand All @@ -1135,9 +1160,9 @@ def write_uri(self, uri: ResourcePathExpression) -> None:
elif extension != ".json.gz":
raise ValueError("Expanded pipeline files should always have a .json.gz extension.")
with uri.open(mode="wb") as stream:
self.write_stream(cast(BinaryIO, stream))
self._write_stream(cast(BinaryIO, stream))

def import_and_configure(
def _import_and_configure(
self, check_edges_unchanged: bool = False, assume_edges_unchanged: bool = False
) -> None:
"""Import the `PipelineTask` classes referenced by all task nodes and
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/pipeline_graph/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def deserialize(
data_id=self.data_id,
)
if import_and_configure:
result.import_and_configure(
result._import_and_configure(
check_edges_unchanged=check_edges_unchanged,
assume_edges_unchanged=assume_edges_unchanged,
)
Expand Down
28 changes: 14 additions & 14 deletions tests/test_pipeline_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,33 +133,33 @@ def test_unresolved_stream_io(self) -> None:
serialization.
"""
stream = io.BytesIO()
self.graph.write_stream(stream)
self.graph._write_stream(stream)
stream.seek(0)
roundtripped = PipelineGraph.read_stream(stream)
roundtripped = PipelineGraph._read_stream(stream)
self.check_make_xgraph(roundtripped, resolved=False)

def test_unresolved_file_io(self) -> None:
"""Test round-tripping an unresolved PipelineGraph through file
serialization.
"""
with lsst.utils.tests.getTempFilePath(".json.gz") as filename:
self.graph.write_uri(filename)
roundtripped = PipelineGraph.read_uri(filename)
self.graph._write_uri(filename)
roundtripped = PipelineGraph._read_uri(filename)
self.check_make_xgraph(roundtripped, resolved=False)

def test_unresolved_deferred_import_io(self) -> None:
"""Test round-tripping an unresolved PipelineGraph through
serialization, without immediately importing tasks on read.
"""
stream = io.BytesIO()
self.graph.write_stream(stream)
self.graph._write_stream(stream)
stream.seek(0)
roundtripped = PipelineGraph.read_stream(stream, import_and_configure=False)
roundtripped = PipelineGraph._read_stream(stream, import_and_configure=False)
self.check_make_xgraph(roundtripped, resolved=False, imported_and_configured=False)
# Check that we can still resolve the graph without importing tasks.
roundtripped.resolve(MockRegistry(self.dimensions, {}))
self.check_make_xgraph(roundtripped, resolved=True, imported_and_configured=False)
roundtripped.import_and_configure(assume_edges_unchanged=True)
roundtripped._import_and_configure(assume_edges_unchanged=True)
self.check_make_xgraph(roundtripped, resolved=True, imported_and_configured=True)

def test_resolved_accessors(self) -> None:
Expand Down Expand Up @@ -198,9 +198,9 @@ def test_resolved_stream_io(self) -> None:
"""
self.graph.resolve(MockRegistry(self.dimensions, {}))
stream = io.BytesIO()
self.graph.write_stream(stream)
self.graph._write_stream(stream)
stream.seek(0)
roundtripped = PipelineGraph.read_stream(stream)
roundtripped = PipelineGraph._read_stream(stream)
self.check_make_xgraph(roundtripped, resolved=True)

def test_resolved_file_io(self) -> None:
Expand All @@ -209,8 +209,8 @@ def test_resolved_file_io(self) -> None:
"""
self.graph.resolve(MockRegistry(self.dimensions, {}))
with lsst.utils.tests.getTempFilePath(".json.gz") as filename:
self.graph.write_uri(filename)
roundtripped = PipelineGraph.read_uri(filename)
self.graph._write_uri(filename)
roundtripped = PipelineGraph._read_uri(filename)
self.check_make_xgraph(roundtripped, resolved=True)

def test_resolved_deferred_import_io(self) -> None:
Expand All @@ -219,11 +219,11 @@ def test_resolved_deferred_import_io(self) -> None:
"""
self.graph.resolve(MockRegistry(self.dimensions, {}))
stream = io.BytesIO()
self.graph.write_stream(stream)
self.graph._write_stream(stream)
stream.seek(0)
roundtripped = PipelineGraph.read_stream(stream, import_and_configure=False)
roundtripped = PipelineGraph._read_stream(stream, import_and_configure=False)
self.check_make_xgraph(roundtripped, resolved=True, imported_and_configured=False)
roundtripped.import_and_configure(check_edges_unchanged=True)
roundtripped._import_and_configure(check_edges_unchanged=True)
self.check_make_xgraph(roundtripped, resolved=True, imported_and_configured=True)

def test_unresolved_copies(self) -> None:
Expand Down

0 comments on commit 60f68d0

Please sign in to comment.