Skip to content

Commit

Permalink
Merge pull request #301 from genematx/296-sr-sd-review
Browse files Browse the repository at this point in the history
Review StreamResource and StreamDatum Schemas
  • Loading branch information
genematx authored Apr 2, 2024
2 parents cb6896f + b9b0586 commit af8fac3
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 87 deletions.
8 changes: 3 additions & 5 deletions docs/user/explanations/data-model.rst
Original file line number Diff line number Diff line change
Expand Up @@ -483,11 +483,9 @@ Typical example:
# 'Stream Resource' document
{'data_key': 'detector_1',
'spec': 'AD_HDF5',
'root': '/GPFS/DATA/Andor/',
'resource_path': '2020/01/03/8ff08ff9-a2bf-48c3-8ff3-dcac0f309d7d.h5',
'resource_kwargs': {'frame_per_point': 1},
'path_semantics': 'posix',
'mimetype': 'application/x-hdf5',
'uri': 'file://localhost/GPFS/DATA/Andor/01/03/8ff08ff9-a2bf-48c3-8ff3-dcac0f309d7d.h5',
'parameters': {'frame_per_point': 1},
'uid': '3b300e6f-b431-4750-a635-5630d15c81a8',
'run_start': '10bf6945-4afd-43ca-af36-6ad8f3540bcd'}
Expand Down
24 changes: 24 additions & 0 deletions docs/user/explanations/external.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,30 @@ The ``spec`` gives us a hint about the format of this asset, whether it be a
file, multiple files, or something more specialized. The ``resource_kwargs``
provide any additional parameters for reading it.

.. code:: python
# 'Stream Resource' document
{'uid': 'aa10035d-1d2b-41d9-97e6-03e3fe62fa6c',
'mimetype': 'application/x-hdf5',
'uri': 'file://localhost/{path}/GPFS/DATA/Andor/2020/01/03/8ff08ff9.h5',
'parameters': {'frame_per_point': 10},
'uid': '3b300e6f-b431-4750-a635-5630d15c81a8',
'run_start': '10bf6945-4afd-43ca-af36-6ad8f3540bcd'}
The ``uri`` specifies the location of the data. It may be a path on the local
filesystem, `file://localhost/{path}`, a path on a shared filesystem
`file://{host}/{path}`, to be remapped at read time via local mount config,
or a non-file-based resource like `s3://...`. The `{path}` part of the `uri`
is typically a relative path, all of which is semantic and should usually not
change during the lifecycle of this asset.

The ``mimetype`` is a recognized standard way to specify the I/O procedures to
read the asset. It gives us a hint about the format of this asset, whether it
be a file, multiple files, or something more specialized. We support standard
mimetypes, such as `image/tiff`, as well as custom ones, e.g.
`application/x-hdf5-smwr-slice`. The ``parameters`` provide any additional
parameters for reading the asset.

Handlers
========

Expand Down
32 changes: 12 additions & 20 deletions event_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2047,12 +2047,10 @@ class ComposeStreamResource:

def __call__(
self,
spec: str,
root: str,
resource_path: str,
mimetype: str,
uri: str,
data_key: str,
resource_kwargs: Dict[str, Any],
path_semantics: Literal["posix", "windows"] = default_path_semantics,
parameters: Dict[str, Any],
uid: Optional[str] = None,
validate: bool = True,
) -> ComposeStreamResourceBundle:
Expand All @@ -2062,11 +2060,9 @@ def __call__(
doc = StreamResource(
uid=uid,
data_key=data_key,
spec=spec,
root=root,
resource_path=resource_path,
resource_kwargs=resource_kwargs,
path_semantics=path_semantics,
mimetype=mimetype,
uri=uri,
parameters=parameters,
)

if self.start:
Expand All @@ -2086,12 +2082,10 @@ def __call__(

def compose_stream_resource(
*,
spec: str,
root: str,
resource_path: str,
mimetype: str,
uri: str,
data_key: str,
resource_kwargs: Dict[str, Any],
path_semantics: Literal["posix", "windows"] = default_path_semantics,
parameters: Dict[str, Any],
start: Optional[RunStart] = None,
uid: Optional[str] = None,
validate: bool = True,
Expand All @@ -2100,12 +2094,10 @@ def compose_stream_resource(
Here for backwards compatibility, the Compose class is prefered.
"""
return ComposeStreamResource(start=start)(
spec,
root,
resource_path,
mimetype,
uri,
data_key,
resource_kwargs,
path_semantics=path_semantics,
parameters,
uid=uid,
validate=validate,
)
Expand Down
24 changes: 5 additions & 19 deletions event_model/documents/stream_resource.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any, Dict

from typing_extensions import Annotated, Literal, NotRequired, TypedDict
from typing_extensions import Annotated, NotRequired, TypedDict

from .generate.type_wrapper import Field, add_extra_schema

Expand All @@ -14,35 +14,21 @@ class StreamResource(TypedDict):
externally-stored data streams
"""

path_semantics: NotRequired[
Annotated[
Literal["posix", "windows"],
Field(description="Rules for joining paths"),
]
]
data_key: Annotated[
str,
Field(
description="A string to show which data_key of the "
"Descriptor are being streamed"
),
]
resource_kwargs: Annotated[
parameters: Annotated[
Dict[str, Any],
Field(
description="Additional argument to pass to the Handler to read a "
description="Additional keyword arguments to pass to the Handler to read a "
"Stream Resource",
),
]
resource_path: Annotated[
str, Field(description="Filepath or URI for locating this resource")
]
root: Annotated[
str,
Field(
description="Subset of resource_path that is a local detail, not semantic."
),
]
uri: Annotated[str, Field(description="URI for locating this resource")]
run_start: NotRequired[
Annotated[
str,
Expand All @@ -52,7 +38,7 @@ class StreamResource(TypedDict):
),
]
]
spec: Annotated[
mimetype: Annotated[
str,
Field(
description="String identifying the format/type of this Stream Resource, "
Expand Down
47 changes: 16 additions & 31 deletions event_model/schemas/stream_resource.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,38 @@
"description": "A string to show which data_key of the Descriptor are being streamed",
"type": "string"
},
"path_semantics": {
"title": "Path Semantics",
"description": "Rules for joining paths",
"type": "string",
"enum": [
"posix",
"windows"
]
},
"resource_kwargs": {
"title": "Resource Kwargs",
"description": "Additional argument to pass to the Handler to read a Stream Resource",
"type": "object"
},
"resource_path": {
"title": "Resource Path",
"description": "Filepath or URI for locating this resource",
"mimetype": {
"title": "Mimetype",
"description": "String identifying the format/type of this Stream Resource, used to identify a compatible Handler",
"type": "string"
},
"root": {
"title": "Root",
"description": "Subset of resource_path that is a local detail, not semantic.",
"type": "string"
"parameters": {
"title": "Parameters",
"description": "Additional keyword arguments to pass to the Handler to read a Stream Resource",
"type": "object"
},
"run_start": {
"title": "Run Start",
"description": "Globally unique ID to the run_start document this Stream Resource is associated with.",
"type": "string"
},
"spec": {
"title": "Spec",
"description": "String identifying the format/type of this Stream Resource, used to identify a compatible Handler",
"type": "string"
},
"uid": {
"title": "Uid",
"description": "Globally unique identifier for this Stream Resource",
"type": "string"
},
"uri": {
"title": "Uri",
"description": "URI for locating this resource",
"type": "string"
}
},
"required": [
"data_key",
"resource_kwargs",
"resource_path",
"root",
"spec",
"uid"
"mimetype",
"parameters",
"uid",
"uri"
],
"additionalProperties": false
}
14 changes: 6 additions & 8 deletions event_model/tests/test_em.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,10 @@ def test_compose_stream_resource(tmp_path):
compose_stream_resource = bundle.compose_stream_resource
assert bundle.compose_stream_resource is compose_stream_resource
bundle = compose_stream_resource(
spec="TIFF_STREAM",
root=str(tmp_path),
mimetype="image/tiff",
uri="file://localhost" + str(tmp_path) + "/test_streams",
data_key="det1",
resource_path="test_streams",
resource_kwargs={},
parameters={},
)
resource_doc, compose_stream_datum = bundle
assert bundle.stream_resource_doc is resource_doc
Expand Down Expand Up @@ -388,11 +387,10 @@ def test_document_router_streams_smoke_test(tmp_path):
start = run_bundle.start_doc
dr("start", start)
stream_resource_doc, compose_stream_datum = compose_stream_resource(
spec="TIFF_STREAM",
mimetype="image/tiff",
data_key="det1",
root=str(tmp_path),
resource_path="test_streams",
resource_kwargs={},
uri="file://localhost" + str(tmp_path) + "/test_streams",
parameters={},
)
dr("stream_resource", stream_resource_doc)
datum_doc = compose_stream_datum(
Expand Down
7 changes: 3 additions & 4 deletions event_model/tests/test_run_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,10 @@ def test_run_router_streams(tmp_path):
)
docs.append(("start", start_doc))
stream_resource_doc, compose_stream_datum = compose_stream_resource(
spec="TIFF_STREAM",
mimetype="image/tiff",
data_key="det1",
root=str(tmp_path),
resource_path="test_streams",
resource_kwargs={},
uri="file://localhost" + str(tmp_path) + "/test_streams",
parameters={},
)
docs.append(("stream_resource", stream_resource_doc))
datum_doc = compose_stream_datum(
Expand Down

0 comments on commit af8fac3

Please sign in to comment.