Skip to content

Commit

Permalink
Persist dimension configs with ResolvedPipelineGraph.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Mar 7, 2023
1 parent 9e65e64 commit cda0f39
Showing 1 changed file with 22 additions and 8 deletions.
30 changes: 22 additions & 8 deletions python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,20 +384,13 @@ def write_to_stream(self, stream: BinaryIO, basename: str = "pipeline", compress
-----
See `write_to_uri` for a description of the file format.
"""
graph_dict = {
"version": ".".join(str(item) for item in _IO_VERSION_INFO),
"description": self.description,
"tasks": {label: node._serialize() for label, node in self.tasks.items()},
"dataset_types": {name: node._serialize() for name, node in self.dataset_types.items()},
"subsets": {label: subset._serialize() for label, subset in self.task_subsets.items()},
}
# Can't get umask without also setting it, so call twice to reset.
mask = os.umask(0o755)
os.umask(mask)
uid = os.getuid()
gid = os.getgid()
timestamp = int(datetime.now().timestamp())
graph_bytes = json.dumps(graph_dict, ensure_ascii=False, indent=2).encode("utf-8")
graph_bytes = json.dumps(self._serialize(), ensure_ascii=False, indent=2).encode("utf-8")
with tarfile.open(fileobj=stream, mode=f"w|{compression}") as archive:
dir_tar_info = tarfile.TarInfo(basename)
dir_tar_info.type = tarfile.DIRTYPE
Expand Down Expand Up @@ -466,6 +459,21 @@ def write_to_uri(self, uri: ResourcePathExpression, basename: str | None = None)
with uri.open(mode="wb") as stream:
self.write_to_stream(cast(BinaryIO, stream), basename)

def _serialize(self) -> dict[str, Any]:
"""Serialize the content of this graph into a dictionary of built-in
objects suitable for JSON conversion.
This should not include config files, as those are always serialized
separately.
"""
return {
"version": ".".join(str(item) for item in _IO_VERSION_INFO),
"description": self.description,
"tasks": {label: node._serialize() for label, node in self.tasks.items()},
"dataset_types": {name: node._serialize() for name, node in self.dataset_types.items()},
"subsets": {label: subset._serialize() for label, subset in self.task_subsets.items()},
}


@final
class MutablePipelineGraph(PipelineGraph[TaskNode, DatasetTypeNode, TaskSubsetGraph]):
Expand Down Expand Up @@ -746,3 +754,9 @@ def group_by_dimensions(
next_new_value = ([], []) # make new lists for next time
group[1].append(dataset_type_node)
return result

def _serialize(self) -> dict[str, Any]:
# Docstring inherited.
result = super()._serialize()
result["dimensions"] = self.universe.dimensionConfig.toDict()
return result

0 comments on commit cda0f39

Please sign in to comment.