diff --git a/docs/source/api.rst b/docs/source/api.rst index 7abac511..8453deac 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -26,6 +26,7 @@ Stream rate_limit scatter sink + sink_to_textfile slice sliding_window starmap @@ -84,6 +85,7 @@ Definitions .. autofunction:: partition .. autofunction:: rate_limit .. autofunction:: sink +.. autofunction:: sink_to_textfile .. autofunction:: sliding_window .. autofunction:: Stream .. autofunction:: timed_window diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 9ae4e934..f521318e 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -60,7 +60,7 @@ Different kinds of add-ons go into different entry point groups: =========== ======================= ===================== Source ``streamz.Source`` ``streamz.sources`` Node ``streamz.Stream`` ``streamz.nodes`` - Sink ``streamz.Stream`` ``streamz.sinks`` + Sink ``streamz.Sink`` ``streamz.sinks`` =========== ======================= ===================== diff --git a/streamz/__init__.py b/streamz/__init__.py index 82e3cf46..18cf5d2a 100644 --- a/streamz/__init__.py +++ b/streamz/__init__.py @@ -3,6 +3,7 @@ from .core import * from .graph import * from .sources import * +from .sinks import * from .plugins import load_plugins load_plugins(Stream) diff --git a/streamz/core.py b/streamz/core.py index aa5106bc..0149b691 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -27,8 +27,6 @@ no_default = '--no-default--' -_global_sinks = set() - _html_update_streams = set() thread_state = threading.local() @@ -167,8 +165,10 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None, self.downstreams = OrderedWeakrefSet() if upstreams is not None: self.upstreams = list(upstreams) - else: + elif upstream is not None: self.upstreams = [upstream] + else: + self.upstreams = [] self._set_asynchronous(asynchronous) self._set_loop(loop) @@ -238,10 +238,7 @@ def _inform_asynchronous(self, asynchronous): def _add_upstream(self, upstream): """Add upstream to current upstreams, this method is overridden for classes which handle stream specific buffers/caches""" - if self.upstreams == [None]: - self.upstreams[0] = upstream - else: - self.upstreams.append(upstream) + self.upstreams.append(upstream) def _add_downstream(self, downstream): """Add downstream to current downstreams""" @@ -254,10 +251,7 @@ def _remove_downstream(self, downstream): def _remove_upstream(self, upstream): """Remove upstream from current upstreams, this method is overridden for classes which handle stream specific buffers/caches""" - if len(self.upstreams) == 1: - self.upstreams[0] = [None] - else: - self.upstreams.remove(upstream) + self.upstreams.remove(upstream) @classmethod def register_api(cls, modifier=identity, attribute_name=None): @@ -529,8 +523,8 @@ def destroy(self, streams=None): if streams is None: streams = self.upstreams for upstream in list(streams): - upstream.downstreams.remove(self) - self.upstreams.remove(upstream) + upstream._remove_downstream(self) + self._remove_upstream(upstream) def scatter(self, **kwargs): from .dask import scatter @@ -656,48 +650,6 @@ def _release_refs(self, metadata, n=1): m['ref'].release(n) -@Stream.register_api() -class sink(Stream): - """ Apply a function on every element - - Examples - -------- - >>> source = Stream() - >>> L = list() - >>> source.sink(L.append) - >>> source.sink(print) - >>> source.sink(print) - >>> source.emit(123) - 123 - 123 - >>> L - [123] - - See Also - -------- - map - Stream.sink_to_list - """ - _graphviz_shape = 'trapezium' - - def __init__(self, upstream, func, *args, **kwargs): - self.func = func - # take the stream specific kwargs out - stream_name = kwargs.pop("stream_name", None) - self.kwargs = kwargs - self.args = args - - Stream.__init__(self, upstream, stream_name=stream_name) - _global_sinks.add(self) - - def update(self, x, who=None, metadata=None): - result = self.func(x, *self.args, **self.kwargs) - if gen.isawaitable(result): - return result - else: - return [] - - @Stream.register_api() class map(Stream): """ Apply a function to every element in the stream diff --git a/streamz/graph.py b/streamz/graph.py index 0189e8d7..8b181fc2 100644 --- a/streamz/graph.py +++ b/streamz/graph.py @@ -69,8 +69,6 @@ def create_graph(node, graph): """ # Step 1 build a set of all the nodes node_set = build_node_set(node) - if None in node_set: - node_set.remove(None) # Step 2 for each node in the set add to the graph for n in node_set: @@ -87,8 +85,7 @@ def create_graph(node, graph): # Step 3 for each node establish its edges for n in node_set: t = hash(n) - upstreams = [_ for _ in n.upstreams if _ is not None] - for nn in upstreams: + for nn in n.upstreams: tt = hash(nn) graph.add_edge(tt, t) diff --git a/streamz/sinks.py b/streamz/sinks.py new file mode 100644 index 00000000..358339ff --- /dev/null +++ b/streamz/sinks.py @@ -0,0 +1,114 @@ +import inspect +import weakref + +from tornado import gen + +from streamz import Stream + +# sinks add themselves here to avoid being garbage-collected +_global_sinks = set() + + +class Sink(Stream): + + _graphviz_shape = 'trapezium' + + def __init__(self, upstream, **kwargs): + super().__init__(upstream, **kwargs) + _global_sinks.add(self) + + +@Stream.register_api() +class sink(Sink): + """ Apply a function on every element + + Parameters + ---------- + func: callable + A function that will be applied on every element. + args: + Positional arguments that will be passed to ``func`` after the incoming element. + kwargs: + Stream-specific arguments will be passed to ``Stream.__init__``, the rest of + them will be passed to ``func``. + + Examples + -------- + >>> source = Stream() + >>> L = list() + >>> source.sink(L.append) + >>> source.sink(print) + >>> source.sink(print) + >>> source.emit(123) + 123 + 123 + >>> L + [123] + + See Also + -------- + map + Stream.sink_to_list + """ + + def __init__(self, upstream, func, *args, **kwargs): + self.func = func + # take the stream specific kwargs out + sig = set(inspect.signature(Stream).parameters) + stream_kwargs = {k: v for (k, v) in kwargs.items() if k in sig} + self.kwargs = {k: v for (k, v) in kwargs.items() if k not in sig} + self.args = args + super().__init__(upstream, **stream_kwargs) + + def update(self, x, who=None, metadata=None): + result = self.func(x, *self.args, **self.kwargs) + if gen.isawaitable(result): + return result + else: + return [] + + def destroy(self): + super().destroy() + _global_sinks.remove(self) + + +@Stream.register_api() +class sink_to_textfile(Sink): + """ Write elements to a plain text file, one element per line. + + Type of elements must be ``str``. + + Parameters + ---------- + file: str or file-like + File to write the elements to. ``str`` is treated as a file name to open. + If file-like, descriptor must be open in text mode. Note that the file + descriptor will be closed when this sink is destroyed. + end: str, optional + This value will be written to the file after each element. + Defaults to newline character. + mode: str, optional + If file is ``str``, file will be opened in this mode. Defaults to ``"a"`` + (append mode). + + Examples + -------- + >>> source = Stream() + >>> source.map(str).sink_to_textfile("test.txt") + >>> source.emit(0) + >>> source.emit(1) + >>> print(open("test.txt", "r").read()) + 0 + 1 + """ + def __init__(self, upstream, file, end="\n", mode="a", **kwargs): + self._end = end + self._fp = open(file, mode=mode) if isinstance(file, str) else file + weakref.finalize(self, self._fp.close) + super().__init__(upstream, **kwargs) + + def __del__(self): + self._fp.close() + + def update(self, x, who=None, metadata=None): + self._fp.write(x + self._end) diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 2eb14f5c..b288ed32 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -395,23 +395,6 @@ def test_sink_to_file(): assert data == 'a\nb\n' -def test_sink_with_args_and_kwargs(): - L = dict() - - def mycustomsink(elem, key, prefix=""): - key = prefix + key - if key not in L: - L[key] = list() - L[key].append(elem) - - s = Stream() - s.sink(mycustomsink, "cat", "super") - - s.emit(1) - s.emit(2) - assert L['supercat'] == [1, 2] - - @gen_test() def test_counter(): counter = itertools.count() @@ -1097,7 +1080,7 @@ def test_connect(): # connect assumes this default behaviour # of stream initialization assert not source_downstream.downstreams - assert source_downstream.upstreams == [None] + assert source_downstream.upstreams == [] # initialize the second stream to connect to source_upstream = Stream() diff --git a/streamz/tests/test_sinks.py b/streamz/tests/test_sinks.py new file mode 100644 index 00000000..6af7eb42 --- /dev/null +++ b/streamz/tests/test_sinks.py @@ -0,0 +1,70 @@ +import weakref + +import pytest +from streamz import Stream +from streamz.sinks import _global_sinks +from streamz.utils_test import tmpfile + + +def test_sink_with_args_and_kwargs(): + L = dict() + + def mycustomsink(elem, key, prefix=""): + key = prefix + key + if key not in L: + L[key] = list() + L[key].append(elem) + + s = Stream() + sink = s.sink(mycustomsink, "cat", "super", stream_name="test") + s.emit(1) + s.emit(2) + + assert L['supercat'] == [1, 2] + assert sink.name == "test" + + +def test_sink_to_textfile_fp(): + source = Stream() + with tmpfile() as filename, open(filename, "w") as fp: + source.map(str).sink_to_textfile(fp) + source.emit(0) + source.emit(1) + + fp.flush() + + assert open(filename, "r").read() == "0\n1\n" + + +def test_sink_to_textfile_named(): + source = Stream() + with tmpfile() as filename: + _sink = source.map(str).sink_to_textfile(filename) + source.emit(0) + source.emit(1) + + _sink._fp.flush() + + assert open(filename, "r").read() == "0\n1\n" + + +def test_sink_to_textfile_closes(): + source = Stream() + with tmpfile() as filename: + sink = source.sink_to_textfile(filename) + fp = sink._fp + _global_sinks.remove(sink) + del sink + + with pytest.raises(ValueError, match=r"I/O operation on closed file\."): + fp.write(".") + + +def test_sink_destroy(): + source = Stream() + sink = source.sink(lambda x: None) + ref = weakref.ref(sink) + sink.destroy() + del sink + + assert ref() is None