diff --git a/docs/user/explanations/data-model.rst b/docs/user/explanations/data-model.rst index dc151b04..3687d688 100644 --- a/docs/user/explanations/data-model.rst +++ b/docs/user/explanations/data-model.rst @@ -477,31 +477,17 @@ See :doc:`external` for details on the role Stream Resource documents play in referencing external assets that are natively ragged, such as single-photon detectors, or assets where there are many relatively small data sets (e.g. scanned fluorescence data). -Minimal nontrivial valid example: - -.. code-block:: python - - # 'Stream Resource' document - {'path_semantics': 'posix', - 'resource_kwargs': {}, - 'resource_path': '/local/path/subdirectory/data_file', - 'root': '/local/path/', - 'run_start': '10bf6945-4afd-43ca-af36-6ad8f3540bcd', - 'spec': 'SOME_SPEC', - 'stream_names': ['point_det'], - 'uid': '272132cf-564f-428f-bf6b-149ee4287024'} - Typical example: .. code-block:: python - # resource - {'spec': 'AD_HDF5', + # 'Stream Resource' document + {'data_key': 'detector_1', + 'spec': 'AD_HDF5', 'root': '/GPFS/DATA/Andor/', 'resource_path': '2020/01/03/8ff08ff9-a2bf-48c3-8ff3-dcac0f309d7d.h5', 'resource_kwargs': {'frame_per_point': 1}, 'path_semantics': 'posix', - 'stream_names': ['point_det'], 'uid': '3b300e6f-b431-4750-a635-5630d15c81a8', 'run_start': '10bf6945-4afd-43ca-af36-6ad8f3540bcd'} @@ -518,32 +504,17 @@ See :doc:`external` for details on the role Stream Datum documents play in refer external assets that are natively ragged, such as single-photon detectors, or assets where there are many relatively small data sets (e.g. scanned fluorescence data). -Minimal nontrivial valid example: - -.. code-block:: python - - # 'datum' document - {'resource': '272132cf-564f-428f-bf6b-149ee4287024', # foreign key - 'datum_kwargs': {}, # format-specific parameters - 'datum_id': '272132cf-564f-428f-bf6b-149ee4287024/1', - 'block_idx': 0, - 'event_count': 1 - } - Typical example: .. code-block:: python - # datum - {'resource': '3b300e6f-b431-4750-a635-5630d15c81a8', - 'datum_kwargs': {'index': 3}, - 'datum_id': '3b300e6f-b431-4750-a635-5630d15c81a8/3', - 'block_idx': 0, - 'event_count': 5, - 'event_offset': 14} - -It is an implementation detail that ``datum_id`` is often formatted as -``{resource}/{counter}`` but this should not be considered part of the schema. + # 'Stream Datum' document + {'uid': '86340942-9865-47f9-9a8d-bdaaab1bfce2', + 'descriptor': '8c70b8c2-df32-40e3-9f50-29cda8142fa0', + 'stream_resource': '272132cf-564f-428f-bf6b-149ee4287024', # foreign key + 'indices': {'start': 0, 'stop': 1}, + 'seq_nums': {'start': 1, 'stop': 2}, + } Formal schema: diff --git a/event_model/__init__.py b/event_model/__init__.py index 38a159a3..3fec64a5 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -51,7 +51,7 @@ from .documents.resource import Resource from .documents.run_start import RunStart from .documents.run_stop import RunStop -from .documents.stream_datum import StreamDatum +from .documents.stream_datum import StreamDatum, StreamRange from .documents.stream_resource import StreamResource if sys.version_info < (3, 9): @@ -1958,12 +1958,13 @@ def __call__( resource_kwargs=resource_kwargs, resource_path=resource_path, ) - if validate: - schema_validators[DocumentNames.resource].validate(doc) if self.start: doc["run_start"] = self.start["uid"] + if validate: + schema_validators[DocumentNames.resource].validate(doc) + counter = itertools.count() return ComposeResourceBundle( doc, @@ -2000,32 +2001,30 @@ def compose_resource( @dataclass class ComposeStreamDatum: stream_resource: StreamResource - stream_name: str counter: Iterator def __call__( self, - datum_kwargs: Dict[str, Any], - event_count: int = 1, - event_offset: int = 0, + indices: StreamRange, + seq_nums: Optional[StreamRange] = None, + descriptor: Optional[EventDescriptor] = None, validate: bool = True, ) -> StreamDatum: resource_uid = self.stream_resource["uid"] - if self.stream_name not in self.stream_resource["stream_names"]: - raise EventModelKeyError( - "Attempt to create stream_datum with name not included" - "in stream_resource" - ) - block_idx = next(self.counter) + + # If the seq_nums aren't passed in then the bluesky + # bundler will keep track of them + if not seq_nums: + seq_nums = StreamRange(start=0, stop=0) + doc = StreamDatum( stream_resource=resource_uid, - datum_kwargs=datum_kwargs, - uid=f"{resource_uid}/{self.stream_name}/{block_idx}", - stream_name=self.stream_name, - block_idx=block_idx, - event_count=event_count, - event_offset=event_offset, + uid=f"{resource_uid}/{next(self.counter)}", + seq_nums=seq_nums, + indices=indices, + descriptor=descriptor["uid"] if descriptor else "", ) + if validate: schema_validators[DocumentNames.stream_datum].validate(doc) @@ -2035,20 +2034,21 @@ def __call__( def compose_stream_datum( *, stream_resource: StreamResource, - stream_name: str, counter: Iterator, - datum_kwargs: Dict[str, Any], - event_count: int = 1, - event_offset: int = 0, + seq_nums: StreamRange, + indices: StreamRange, validate: bool = True, ) -> StreamDatum: """ Here for backwards compatibility, the Compose class is prefered. """ - return ComposeStreamDatum(stream_resource, stream_name, counter)( - datum_kwargs, - event_count=event_count, - event_offset=event_offset, + warnings.warn( + "compose_stream_datum() will be removed in the minor version.", + DeprecationWarning, + ) + return ComposeStreamDatum(stream_resource, counter)( + seq_nums, + indices, validate=validate, ) @@ -2056,14 +2056,14 @@ def compose_stream_datum( @dataclass class ComposeStreamResourceBundle: stream_resource_doc: StreamResource - compose_stream_data: List[ComposeStreamDatum] + compose_stream_datum: ComposeStreamDatum # iter for backwards compatibility def __iter__(self) -> Iterator: return iter( ( self.stream_resource_doc, - self.compose_stream_data, + self.compose_stream_datum, ) ) @@ -2077,36 +2077,25 @@ def __call__( spec: str, root: str, resource_path: str, + data_key: str, resource_kwargs: Dict[str, Any], - stream_names: Union[List, str], - counters: List = [], path_semantics: Literal["posix", "windows"] = default_path_semantics, uid: Optional[str] = None, validate: bool = True, ) -> ComposeStreamResourceBundle: if uid is None: uid = str(uuid.uuid4()) - if isinstance(stream_names, str): - stream_names = [ - stream_names, - ] - if len(counters) == 0: - counters = [itertools.count() for _ in stream_names] - elif len(counters) > len(stream_names): - raise ValueError( - "Insufficient number of counters " - f"{len(counters)} for stream names: {stream_names}" - ) doc = StreamResource( uid=uid, + data_key=data_key, spec=spec, root=root, resource_path=resource_path, resource_kwargs=resource_kwargs, - stream_names=stream_names, path_semantics=path_semantics, ) + if self.start: doc["run_start"] = self.start["uid"] @@ -2115,14 +2104,10 @@ def __call__( return ComposeStreamResourceBundle( doc, - [ - ComposeStreamDatum( - stream_resource=doc, - stream_name=stream_name, - counter=counter, - ) - for stream_name, counter in zip(stream_names, counters) - ], + ComposeStreamDatum( + stream_resource=doc, + counter=itertools.count(), + ), ) @@ -2131,9 +2116,8 @@ def compose_stream_resource( spec: str, root: str, resource_path: str, + data_key: str, resource_kwargs: Dict[str, Any], - stream_names: Union[List, str], - counters: List = [], path_semantics: Literal["posix", "windows"] = default_path_semantics, start: Optional[RunStart] = None, uid: Optional[str] = None, @@ -2146,9 +2130,8 @@ def compose_stream_resource( spec, root, resource_path, + data_key, resource_kwargs, - stream_names, - counters=counters, path_semantics=path_semantics, uid=uid, validate=validate, @@ -2213,6 +2196,17 @@ def compose_stop( )(exit_status=exit_status, reason=reason, uid=uid, time=time, validate=validate) +def length_of_value(dictionary: Dict[str, List], error_msg: str) -> Optional[int]: + length = None + for k, v in dictionary.items(): + v_len = len(v) + if length is not None: + if v_len != length: + raise EventModelError(error_msg) + length = v_len + return length + + @dataclass class ComposeEventPage: descriptor: EventDescriptor @@ -2222,12 +2216,32 @@ def __call__( self, data: Dict[str, List], timestamps: Dict[str, Any], - seq_num: List[int], + seq_num: Optional[List[int]] = None, filled: Optional[Dict[str, List[Union[bool, str]]]] = None, uid: Optional[List] = None, time: Optional[List] = None, validate: bool = True, ) -> EventPage: + timestamps_length = length_of_value( + timestamps, + "Cannot compose event_page: event_page contains `timestamps` " + "list values of different lengths", + ) + data_length = length_of_value( + data, + "Cannot compose event_page: event_page contains `data` " + "lists of different lengths", + ) + assert timestamps_length == data_length, ( + "Cannot compose event_page: the lists in `timestamps` are of a different " + "length to those in `data`" + ) + + if seq_num is None: + last_seq_num = self.event_counters[self.descriptor["name"]] + seq_num = list( + range(last_seq_num, len(next(iter(data.values()))) + last_seq_num) + ) N = len(seq_num) if uid is None: uid = [str(uuid.uuid4()) for _ in range(N)] @@ -2246,11 +2260,20 @@ def __call__( ) if validate: schema_validators[DocumentNames.event_page].validate(doc) + if not ( - self.descriptor["data_keys"].keys() == data.keys() == timestamps.keys() + set( + keys_without_stream_keys( + self.descriptor["data_keys"], self.descriptor["data_keys"] + ) + ) + == set(keys_without_stream_keys(data, self.descriptor["data_keys"])) + == set( + keys_without_stream_keys(timestamps, self.descriptor["data_keys"]) + ) ): raise EventModelValidationError( - "These sets of keys must match:\n" + 'These sets of keys must match (other than "STREAM:" keys):\n' "event['data'].keys(): {}\n" "event['timestamps'].keys(): {}\n" "descriptor['data_keys'].keys(): {}\n".format( @@ -2264,7 +2287,7 @@ def __call__( "Keys in event['filled'] {} must be a subset of those in " "event['data'] {}".format(filled.keys(), data.keys()) ) - self.event_counters[self.descriptor["name"]] += len(data) + self.event_counters[self.descriptor["name"]] += len(seq_num) return doc @@ -2284,10 +2307,27 @@ def compose_event_page( Here for backwards compatibility, the Compose class is prefered. """ return ComposeEventPage(descriptor, event_counters)( - data, timestamps, seq_num, filled, uid=uid, time=time, validate=validate + data, + timestamps, + seq_num=seq_num, + filled=filled, + uid=uid, + time=time, + validate=validate, ) +def keys_without_stream_keys(dictionary, descriptor_data_keys): + return [ + key + for key in dictionary.keys() + if ( + "external" not in descriptor_data_keys[key] + or descriptor_data_keys[key]["external"] != "STREAM:" + ) + ] + + @dataclass class ComposeEvent: descriptor: EventDescriptor @@ -2322,11 +2362,20 @@ def __call__( ) if validate: schema_validators[DocumentNames.event].validate(doc) + if not ( - self.descriptor["data_keys"].keys() == data.keys() == timestamps.keys() + set( + keys_without_stream_keys( + self.descriptor["data_keys"], self.descriptor["data_keys"] + ) + ) + == set(keys_without_stream_keys(data, self.descriptor["data_keys"])) + == set( + keys_without_stream_keys(timestamps, self.descriptor["data_keys"]) + ) ): raise EventModelValidationError( - "These sets of keys must match:\n" + 'These sets of keys must match (other than "STREAM:" keys):\n' "event['data'].keys(): {}\n" "event['timestamps'].keys(): {}\n" "descriptor['data_keys'].keys(): {}\n".format( @@ -2340,7 +2389,7 @@ def __call__( "Keys in event['filled'] {} must be a subset of those in " "event['data'] {}".format(filled.keys(), data.keys()) ) - self.event_counters[self.descriptor["name"]] += 1 + self.event_counters[self.descriptor["name"]] = seq_num + 1 return doc diff --git a/event_model/documents/event.py b/event_model/documents/event.py index 58fdad8c..2b6dc477 100644 --- a/event_model/documents/event.py +++ b/event_model/documents/event.py @@ -39,7 +39,6 @@ class Event(PartialEvent): descriptor: Annotated[ str, Field(description="UID of the EventDescriptor to which this Event belongs") ] - seq_num: Annotated[ int, Field( diff --git a/event_model/documents/event_descriptor.py b/event_model/documents/event_descriptor.py index 6fef8484..f7f0178a 100644 --- a/event_model/documents/event_descriptor.py +++ b/event_model/documents/event_descriptor.py @@ -4,6 +4,8 @@ from .generate.type_wrapper import Field, add_extra_schema +Dtype = Literal["string", "number", "array", "boolean", "integer"] + class DataKey(TypedDict): """Describes the objects in the data property of Event documents""" @@ -18,7 +20,7 @@ class DataKey(TypedDict): ] ] dtype: Annotated[ - Literal["string", "number", "array", "boolean", "integer"], + Dtype, Field(description="The type of the data in the event."), ] external: NotRequired[ diff --git a/event_model/documents/generate/__main__.py b/event_model/documents/generate/__main__.py new file mode 100644 index 00000000..6089f854 --- /dev/null +++ b/event_model/documents/generate/__main__.py @@ -0,0 +1,3 @@ +from typeddict_to_schema import generate_all_schema + +generate_all_schema() diff --git a/event_model/documents/stream_datum.py b/event_model/documents/stream_datum.py index 5d9bf832..a48de7ab 100644 --- a/event_model/documents/stream_datum.py +++ b/event_model/documents/stream_datum.py @@ -1,9 +1,21 @@ -from typing import Any, Dict - from typing_extensions import Annotated, TypedDict from .generate.type_wrapper import Field, add_extra_schema + +class StreamRange(TypedDict): + """The parameters required to describe a sequence of incrementing integers""" + + start: Annotated[ + int, + Field(description="First number in the range"), + ] + stop: Annotated[ + int, + Field(description="Last number in the range is less than this number"), + ] + + STREAM_DATUM_EXTRA_SCHEMA = {"additionalProperties": False} @@ -11,36 +23,9 @@ class StreamDatum(TypedDict): """Document to reference a quanta of an externally-stored stream of data.""" - block_idx: Annotated[ - int, - Field( - description="The order in the stream of this block of data. This must " - "be contiguous for a given stream.", - ), - ] - datum_kwargs: Annotated[ - Dict[str, Any], - Field( - description="Arguments to pass to the Handler to retrieve one " - "quanta of data", - ), - ] - event_count: Annotated[ - int, Field(description="The number of events in this datum.") - ] - event_offset: Annotated[ - int, - Field( - description="The sequence number of the first event in this block. This " - "increasing value allows the presence of gaps.", - ), - ] - stream_name: Annotated[ + descriptor: Annotated[ str, - Field( - description="The name of the stream that this Datum is providing a " - "block of.", - ), + Field(description="UID of the EventDescriptor to " "which this Datum belongs"), ] stream_resource: Annotated[ str, @@ -52,6 +37,20 @@ class StreamDatum(TypedDict): str, Field( description="Globally unique identifier for this Datum. A suggested " - "formatting being '//", + "formatting being '//" + ), + ] + seq_nums: Annotated[ + StreamRange, + Field( + description="A slice object showing the Event numbers the " + "resource corresponds to" + ), + ] + indices: Annotated[ + StreamRange, + Field( + description="A slice object passed to the StreamResource " + "handler so it can hand back data and timestamps" ), ] diff --git a/event_model/documents/stream_resource.py b/event_model/documents/stream_resource.py index fe9050fc..3ab1877e 100644 --- a/event_model/documents/stream_resource.py +++ b/event_model/documents/stream_resource.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List +from typing import Any, Dict from typing_extensions import Annotated, Literal, NotRequired, TypedDict @@ -20,6 +20,13 @@ class StreamResource(TypedDict): Field(description="Rules for joining paths"), ] ] + data_key: Annotated[ + str, + Field( + description="A string to show which data_key of the " + "Descriptor are being streamed" + ), + ] resource_kwargs: Annotated[ Dict[str, Any], Field( @@ -55,11 +62,3 @@ class StreamResource(TypedDict): uid: Annotated[ str, Field(description="Globally unique identifier for this Stream Resource") ] - stream_names: Annotated[ - List[str], - Field( - description="List of the stream names this resource provides", - min_items=1, - unique_items=True, - ), - ] diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index e10017b2..bd024c32 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -2,31 +2,52 @@ "title": "stream_datum", "description": "Document to reference a quanta of an externally-stored stream of data.", "type": "object", + "definitions": { + "StreamRange": { + "title": "StreamRange", + "description": "The parameters required to describe a sequence of incrementing integers", + "type": "object", + "properties": { + "start": { + "title": "Start", + "description": "First number in the range", + "type": "integer" + }, + "stop": { + "title": "Stop", + "description": "Last number in the range is less than this number", + "type": "integer" + } + }, + "required": [ + "start", + "stop" + ] + } + }, "properties": { - "block_idx": { - "title": "Block Idx", - "description": "The order in the stream of this block of data. This must be contiguous for a given stream.", - "type": "integer" - }, - "datum_kwargs": { - "title": "Datum Kwargs", - "description": "Arguments to pass to the Handler to retrieve one quanta of data", - "type": "object" - }, - "event_count": { - "title": "Event Count", - "description": "The number of events in this datum.", - "type": "integer" + "descriptor": { + "title": "Descriptor", + "description": "UID of the EventDescriptor to which this Datum belongs", + "type": "string" }, - "event_offset": { - "title": "Event Offset", - "description": "The sequence number of the first event in this block. This increasing value allows the presence of gaps.", - "type": "integer" + "indices": { + "title": "Indices", + "description": "A slice object passed to the StreamResource handler so it can hand back data and timestamps", + "allOf": [ + { + "$ref": "#/definitions/StreamRange" + } + ] }, - "stream_name": { - "title": "Stream Name", - "description": "The name of the stream that this Datum is providing a block of.", - "type": "string" + "seq_nums": { + "title": "Seq Nums", + "description": "A slice object showing the Event numbers the resource corresponds to", + "allOf": [ + { + "$ref": "#/definitions/StreamRange" + } + ] }, "stream_resource": { "title": "Stream Resource", @@ -40,11 +61,9 @@ } }, "required": [ - "block_idx", - "datum_kwargs", - "event_count", - "event_offset", - "stream_name", + "descriptor", + "indices", + "seq_nums", "stream_resource", "uid" ], diff --git a/event_model/schemas/stream_resource.json b/event_model/schemas/stream_resource.json index e6d02554..3e9a3fd7 100644 --- a/event_model/schemas/stream_resource.json +++ b/event_model/schemas/stream_resource.json @@ -3,6 +3,11 @@ "description": "Document to reference a collection (e.g. file or group of files) of externally-stored data streams", "type": "object", "properties": { + "data_key": { + "title": "Data Key", + "description": "A string to show which data_key of the Descriptor are being streamed", + "type": "string" + }, "path_semantics": { "title": "Path Semantics", "description": "Rules for joining paths", @@ -37,16 +42,6 @@ "description": "String identifying the format/type of this Stream Resource, used to identify a compatible Handler", "type": "string" }, - "stream_names": { - "title": "Stream Names", - "description": "List of the stream names this resource provides", - "type": "array", - "minItems": 1, - "uniqueItems": true, - "items": { - "type": "string" - } - }, "uid": { "title": "Uid", "description": "Globally unique identifier for this Stream Resource", @@ -54,11 +49,11 @@ } }, "required": [ + "data_key", "resource_kwargs", "resource_path", "root", "spec", - "stream_names", "uid" ], "additionalProperties": false diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index 6fce1246..866810b7 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -7,6 +7,7 @@ import pytest import event_model +from event_model.documents.stream_datum import StreamRange JSONSCHEMA_2 = LooseVersion(jsonschema.__version__) < LooseVersion("3.0.0") @@ -109,37 +110,17 @@ def test_compose_stream_resource(tmp_path): bundle = event_model.compose_run() compose_stream_resource = bundle.compose_stream_resource assert bundle.compose_stream_resource is compose_stream_resource - stream_names = ["stream_1", "stream_2"] bundle = compose_stream_resource( spec="TIFF_STREAM", root=str(tmp_path), + data_key="det1", resource_path="test_streams", resource_kwargs={}, - stream_names=stream_names, ) - resource_doc, compose_stream_data = bundle + resource_doc, compose_stream_datum = bundle assert bundle.stream_resource_doc is resource_doc - assert bundle.compose_stream_data is compose_stream_data - assert compose_stream_data[0] is not compose_stream_data[1] - datum_doc_0, datum_doc_1 = ( - compose_stream_datum(datum_kwargs={}) - for compose_stream_datum in compose_stream_data - ) - # Ensure independent counters - assert datum_doc_0["block_idx"] == datum_doc_1["block_idx"] - datum_doc_1a = compose_stream_data[1](datum_kwargs={}) - assert datum_doc_1a["block_idx"] != datum_doc_1["block_idx"] - - # Ensure safety check - from itertools import count - - with pytest.raises(KeyError): - event_model.compose_stream_datum( - stream_resource=resource_doc, - stream_name="stream_NA", - counter=count(), - datum_kwargs={}, - ) + assert bundle.compose_stream_datum is compose_stream_datum + compose_stream_datum(StreamRange(start=0, stop=0), StreamRange(start=0, stop=0)) def test_round_trip_pagination(): @@ -410,21 +391,18 @@ def test_document_router_streams_smoke_test(tmp_path): compose_stream_resource = run_bundle.compose_stream_resource start = run_bundle.start_doc dr("start", start) - stream_names = ["stream_1", "stream_2"] - stream_resource_doc, compose_stream_data = compose_stream_resource( + stream_resource_doc, compose_stream_datum = compose_stream_resource( spec="TIFF_STREAM", + data_key="det1", root=str(tmp_path), resource_path="test_streams", resource_kwargs={}, - stream_names=stream_names, ) dr("stream_resource", stream_resource_doc) - datum_doc_0, datum_doc_1 = ( - compose_stream_datum(datum_kwargs={}) - for compose_stream_datum in compose_stream_data + datum_doc = compose_stream_datum( + StreamRange(start=0, stop=0), StreamRange(start=0, stop=0) ) - dr("stream_datum", datum_doc_0) - dr("stream_datum", datum_doc_1) + dr("stream_datum", datum_doc) dr("stop", run_bundle.compose_stop()) @@ -1040,8 +1018,8 @@ def test_array_like(): ) desc_bundle.compose_event_page( data={"a": dask_array.ones((5, 3))}, - timestamps={"a": [1, 2, 3]}, - seq_num=[1, 2, 3], + timestamps={"a": [1, 2, 3, 4, 5]}, + seq_num=[1, 2, 3, 4, 5], ) diff --git a/event_model/tests/test_run_router.py b/event_model/tests/test_run_router.py index 0143b74e..93149d99 100644 --- a/event_model/tests/test_run_router.py +++ b/event_model/tests/test_run_router.py @@ -4,6 +4,7 @@ import pytest import event_model +from event_model.documents.stream_datum import StreamRange def test_run_router(tmp_path): @@ -215,21 +216,18 @@ def test_run_router_streams(tmp_path): bundle.compose_stop(), ) docs.append(("start", start_doc)) - stream_names = ["stream_1", "stream_2"] - stream_resource_doc, compose_stream_data = compose_stream_resource( + stream_resource_doc, compose_stream_datum = compose_stream_resource( spec="TIFF_STREAM", + data_key="det1", root=str(tmp_path), resource_path="test_streams", resource_kwargs={}, - stream_names=stream_names, ) docs.append(("stream_resource", stream_resource_doc)) - datum_doc_0, datum_doc_1 = ( - compose_stream_datum(datum_kwargs={}) - for compose_stream_datum in compose_stream_data + datum_doc = compose_stream_datum( + StreamRange(start=0, stop=0), StreamRange(start=0, stop=0) ) - docs.append(("stream_datum", datum_doc_0)) - docs.append(("stream_datum", datum_doc_1)) + docs.append(("stream_datum", datum_doc)) docs.append(("stop", stop_doc)) # Empty list of factories. Just make sure nothing blows up. @@ -254,7 +252,7 @@ def all_factory(name, doc): for name, doc in docs: rr(name, doc) assert len(resource_list) == 1 - assert len(data_list) == 2 + assert len(data_list) == 1 def test_subfactory():