From bceb79a6c9d4efad48791e6425e0eec121a5b60f Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Thu, 12 Nov 2020 03:48:38 +0300 Subject: [PATCH 1/7] refactor sinks, sink_to_textfile --- .travis.yml | 2 +- streamz/__init__.py | 1 + streamz/core.py | 44 ----------------- streamz/sinks.py | 95 +++++++++++++++++++++++++++++++++++++ streamz/tests/test_core.py | 17 ------- streamz/tests/test_sinks.py | 45 ++++++++++++++++++ 6 files changed, 142 insertions(+), 62 deletions(-) create mode 100644 streamz/sinks.py create mode 100644 streamz/tests/test_sinks.py diff --git a/.travis.yml b/.travis.yml index d2eadad0..3eb7a38f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,7 +21,7 @@ install: - conda update conda # Install dependencies - - conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml + - travis_wait 30 conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml - source activate test-streamz - python setup.py install diff --git a/streamz/__init__.py b/streamz/__init__.py index ebd013c1..92696cc6 100644 --- a/streamz/__init__.py +++ b/streamz/__init__.py @@ -3,6 +3,7 @@ from .core import * from .graph import * from .sources import * +from .sinks import * try: from .dask import DaskStream, scatter except ImportError: diff --git a/streamz/core.py b/streamz/core.py index c3f4512f..e9dfb565 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -27,8 +27,6 @@ no_default = '--no-default--' -_global_sinks = set() - _html_update_streams = set() thread_state = threading.local() @@ -622,48 +620,6 @@ def _release_refs(self, metadata, n=1): m['ref'].release(n) -@Stream.register_api() -class sink(Stream): - """ Apply a function on every element - - Examples - -------- - >>> source = Stream() - >>> L = list() - >>> source.sink(L.append) - >>> source.sink(print) - >>> source.sink(print) - >>> source.emit(123) - 123 - 123 - >>> L - [123] - - See Also - -------- - map - Stream.sink_to_list - """ - _graphviz_shape = 'trapezium' - - def __init__(self, upstream, func, *args, **kwargs): - self.func = func - # take the stream specific kwargs out - stream_name = kwargs.pop("stream_name", None) - self.kwargs = kwargs - self.args = args - - Stream.__init__(self, upstream, stream_name=stream_name) - _global_sinks.add(self) - - def update(self, x, who=None, metadata=None): - result = self.func(x, *self.args, **self.kwargs) - if gen.isawaitable(result): - return result - else: - return [] - - @Stream.register_api() class map(Stream): """ Apply a function to every element in the stream diff --git a/streamz/sinks.py b/streamz/sinks.py new file mode 100644 index 00000000..fe85b22e --- /dev/null +++ b/streamz/sinks.py @@ -0,0 +1,95 @@ +from tornado import gen + +from streamz import Stream + +_global_sinks = set() + + +class Sink(Stream): + + _graphviz_shape = 'trapezium' + + def __init__(self, upstream, **kwargs): + super().__init__(upstream, **kwargs) + _global_sinks.add(self) + + +@Stream.register_api() +class sink(Sink): + """ Apply a function on every element + + Examples + -------- + >>> source = Stream() + >>> L = list() + >>> source.sink(L.append) + >>> source.sink(print) + >>> source.sink(print) + >>> source.emit(123) + 123 + 123 + >>> L + [123] + + See Also + -------- + map + Stream.sink_to_list + """ + + def __init__(self, upstream, func, *args, **kwargs): + self.func = func + # take the stream specific kwargs out + stream_name = kwargs.pop("stream_name", None) + self.kwargs = kwargs + self.args = args + super().__init__(upstream, stream_name=stream_name) + + def update(self, x, who=None, metadata=None): + result = self.func(x, *self.args, **self.kwargs) + if gen.isawaitable(result): + return result + else: + return [] + + +@Stream.register_api() +class sink_to_textfile(Sink): + """ Write elements to a plain text file, one element per line. Type of elements + must be ``str``. + + Arguments + --------- + file: str or file-like + File to write the elements to. ``str`` is treated as a file name to open. + If file-like, descriptor must be open in text mode. + Note that the file descriptor will be closed when sink is destroyed. + end: str, optional + This value will be written to the file after each element. + Defaults to newline character. + mode: str, optional + If file is ``str``, file will be opened in this mode. Defaults to ``"a"`` + (append mode). + + Examples + -------- + >>> source = Stream() + >>> source.map(str).sink_to_file("test.txt") + >>> source.emit(0) + >>> source.emit(1) + >>> print(open("test.txt", "r").read()) + 0 + 1 + """ + def __init__(self, upstream, file, end="\n", mode="a", **kwargs): + self._fp = open(file, mode=mode, buffering=1) if isinstance(file, str) else file + self._end = end + super().__init__(upstream, ensure_io_loop=True, **kwargs) + + def __del__(self): + self._fp.close() + + @gen.coroutine + def update(self, x, who=None, metadata=None): + yield self.loop.run_in_executor(None, self._fp.write, x) + yield self.loop.run_in_executor(None, self._fp.write, self._end) diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 3495832c..35eae723 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -336,23 +336,6 @@ def test_sink_to_file(): assert data == 'a\nb\n' -def test_sink_with_args_and_kwargs(): - L = dict() - - def mycustomsink(elem, key, prefix=""): - key = prefix + key - if key not in L: - L[key] = list() - L[key].append(elem) - - s = Stream() - s.sink(mycustomsink, "cat", "super") - - s.emit(1) - s.emit(2) - assert L['supercat'] == [1, 2] - - @gen_test() def test_counter(): counter = itertools.count() diff --git a/streamz/tests/test_sinks.py b/streamz/tests/test_sinks.py new file mode 100644 index 00000000..41412cd3 --- /dev/null +++ b/streamz/tests/test_sinks.py @@ -0,0 +1,45 @@ +from time import sleep + +from streamz import Stream +from streamz.utils_test import tmpfile + + +def test_sink_with_args_and_kwargs(): + L = dict() + + def mycustomsink(elem, key, prefix=""): + key = prefix + key + if key not in L: + L[key] = list() + L[key].append(elem) + + s = Stream() + s.sink(mycustomsink, "cat", "super") + + s.emit(1) + s.emit(2) + assert L['supercat'] == [1, 2] + + +def test_sink_to_textfile_fp(): + source = Stream() + with tmpfile() as filename, open(filename, "w", buffering=1) as fp: + source.map(str).sink_to_textfile(fp) + source.emit(0) + source.emit(1) + + sleep(0.01) + + assert open(filename, "r").read() == "0\n1\n" + + +def test_sink_to_textfile_named(): + source = Stream() + with tmpfile() as filename: + source.map(str).sink_to_textfile(filename) + source.emit(0) + source.emit(1) + + sleep(0.01) + + assert open(filename, "r").read() == "0\n1\n" From 7f04156e24e363bec409e7eb4f5a4ddcb86c1943 Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Fri, 13 Nov 2020 20:20:57 +0300 Subject: [PATCH 2/7] add test: sink_to_textfile closes file when deleted --- streamz/tests/test_sinks.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/streamz/tests/test_sinks.py b/streamz/tests/test_sinks.py index 41412cd3..bc7c0a40 100644 --- a/streamz/tests/test_sinks.py +++ b/streamz/tests/test_sinks.py @@ -1,6 +1,8 @@ from time import sleep +import pytest from streamz import Stream +from streamz.sinks import _global_sinks from streamz.utils_test import tmpfile @@ -43,3 +45,15 @@ def test_sink_to_textfile_named(): sleep(0.01) assert open(filename, "r").read() == "0\n1\n" + + +def test_sink_to_textfile_closes(): + source = Stream() + with tmpfile() as filename: + sink = source.sink_to_textfile(filename) + fp = sink._fp + _global_sinks.remove(sink) + del sink + + with pytest.raises(ValueError): # I/O operation on closed file + fp.write(".") From c001c363d8a6ac454b49464c0e849b432e3cf4b9 Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Fri, 13 Nov 2020 20:20:57 +0300 Subject: [PATCH 3/7] add test: sink_to_textfile closes file when deleted update docs --- docs/source/api.rst | 2 ++ docs/source/plugins.rst | 2 +- streamz/sinks.py | 11 ++++++----- streamz/tests/test_sinks.py | 14 ++++++++++++++ 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/docs/source/api.rst b/docs/source/api.rst index 7abac511..8453deac 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -26,6 +26,7 @@ Stream rate_limit scatter sink + sink_to_textfile slice sliding_window starmap @@ -84,6 +85,7 @@ Definitions .. autofunction:: partition .. autofunction:: rate_limit .. autofunction:: sink +.. autofunction:: sink_to_textfile .. autofunction:: sliding_window .. autofunction:: Stream .. autofunction:: timed_window diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 9ae4e934..f521318e 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -60,7 +60,7 @@ Different kinds of add-ons go into different entry point groups: =========== ======================= ===================== Source ``streamz.Source`` ``streamz.sources`` Node ``streamz.Stream`` ``streamz.nodes`` - Sink ``streamz.Stream`` ``streamz.sinks`` + Sink ``streamz.Sink`` ``streamz.sinks`` =========== ======================= ===================== diff --git a/streamz/sinks.py b/streamz/sinks.py index fe85b22e..56a2047c 100644 --- a/streamz/sinks.py +++ b/streamz/sinks.py @@ -55,15 +55,16 @@ def update(self, x, who=None, metadata=None): @Stream.register_api() class sink_to_textfile(Sink): - """ Write elements to a plain text file, one element per line. Type of elements - must be ``str``. + """ Write elements to a plain text file, one element per line. + + Type of elements must be ``str``. Arguments --------- file: str or file-like File to write the elements to. ``str`` is treated as a file name to open. - If file-like, descriptor must be open in text mode. - Note that the file descriptor will be closed when sink is destroyed. + If file-like, descriptor must be open in text mode. Note that the file + descriptor will be closed when this sink is destroyed. end: str, optional This value will be written to the file after each element. Defaults to newline character. @@ -74,7 +75,7 @@ class sink_to_textfile(Sink): Examples -------- >>> source = Stream() - >>> source.map(str).sink_to_file("test.txt") + >>> source.map(str).sink_to_textfile("test.txt") >>> source.emit(0) >>> source.emit(1) >>> print(open("test.txt", "r").read()) diff --git a/streamz/tests/test_sinks.py b/streamz/tests/test_sinks.py index 41412cd3..bc7c0a40 100644 --- a/streamz/tests/test_sinks.py +++ b/streamz/tests/test_sinks.py @@ -1,6 +1,8 @@ from time import sleep +import pytest from streamz import Stream +from streamz.sinks import _global_sinks from streamz.utils_test import tmpfile @@ -43,3 +45,15 @@ def test_sink_to_textfile_named(): sleep(0.01) assert open(filename, "r").read() == "0\n1\n" + + +def test_sink_to_textfile_closes(): + source = Stream() + with tmpfile() as filename: + sink = source.sink_to_textfile(filename) + fp = sink._fp + _global_sinks.remove(sink) + del sink + + with pytest.raises(ValueError): # I/O operation on closed file + fp.write(".") From 5c1b3a6bdc556293157f542c9fad821d27545e3b Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Mon, 16 Nov 2020 23:55:54 +0300 Subject: [PATCH 4/7] imporved docs, comments, updated tests --- streamz/sinks.py | 31 +++++++++++++++++++++++-------- streamz/tests/test_sinks.py | 17 ++++++++--------- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/streamz/sinks.py b/streamz/sinks.py index 56a2047c..bd438071 100644 --- a/streamz/sinks.py +++ b/streamz/sinks.py @@ -1,7 +1,11 @@ +import inspect +import weakref + from tornado import gen from streamz import Stream +# sinks add themselves here to avoid being garbage-collected _global_sinks = set() @@ -18,6 +22,16 @@ def __init__(self, upstream, **kwargs): class sink(Sink): """ Apply a function on every element + Parameters + ---------- + func: callable + A function that will be applied on every element. + args: + Positional arguments that will be passed to ``func`` after the incoming element. + kwargs: + Stream-specific arguments will be passed to ``Stream.__init__``, the rest of + them will be passed to ``func``. + Examples -------- >>> source = Stream() @@ -40,10 +54,11 @@ class sink(Sink): def __init__(self, upstream, func, *args, **kwargs): self.func = func # take the stream specific kwargs out - stream_name = kwargs.pop("stream_name", None) - self.kwargs = kwargs + sig = set(inspect.signature(Stream).parameters) + stream_kwargs = {k: v for (k, v) in kwargs.items() if k in sig} + self.kwargs = {k: v for (k, v) in kwargs.items() if k not in sig} self.args = args - super().__init__(upstream, stream_name=stream_name) + super().__init__(upstream, **stream_kwargs) def update(self, x, who=None, metadata=None): result = self.func(x, *self.args, **self.kwargs) @@ -59,8 +74,8 @@ class sink_to_textfile(Sink): Type of elements must be ``str``. - Arguments - --------- + Parameters + ---------- file: str or file-like File to write the elements to. ``str`` is treated as a file name to open. If file-like, descriptor must be open in text mode. Note that the file @@ -83,8 +98,9 @@ class sink_to_textfile(Sink): 1 """ def __init__(self, upstream, file, end="\n", mode="a", **kwargs): - self._fp = open(file, mode=mode, buffering=1) if isinstance(file, str) else file self._end = end + self._fp = open(file, mode=mode) if isinstance(file, str) else file + weakref.finalize(self, self._fp.close) super().__init__(upstream, ensure_io_loop=True, **kwargs) def __del__(self): @@ -92,5 +108,4 @@ def __del__(self): @gen.coroutine def update(self, x, who=None, metadata=None): - yield self.loop.run_in_executor(None, self._fp.write, x) - yield self.loop.run_in_executor(None, self._fp.write, self._end) + yield self.loop.run_in_executor(None, self._fp.write, x + self._end) diff --git a/streamz/tests/test_sinks.py b/streamz/tests/test_sinks.py index bc7c0a40..aecd8f08 100644 --- a/streamz/tests/test_sinks.py +++ b/streamz/tests/test_sinks.py @@ -1,5 +1,3 @@ -from time import sleep - import pytest from streamz import Stream from streamz.sinks import _global_sinks @@ -16,21 +14,22 @@ def mycustomsink(elem, key, prefix=""): L[key].append(elem) s = Stream() - s.sink(mycustomsink, "cat", "super") - + sink = s.sink(mycustomsink, "cat", "super", stream_name="test") s.emit(1) s.emit(2) + assert L['supercat'] == [1, 2] + assert sink.name == "test" def test_sink_to_textfile_fp(): source = Stream() - with tmpfile() as filename, open(filename, "w", buffering=1) as fp: + with tmpfile() as filename, open(filename, "w") as fp: source.map(str).sink_to_textfile(fp) source.emit(0) source.emit(1) - sleep(0.01) + fp.flush() assert open(filename, "r").read() == "0\n1\n" @@ -38,11 +37,11 @@ def test_sink_to_textfile_fp(): def test_sink_to_textfile_named(): source = Stream() with tmpfile() as filename: - source.map(str).sink_to_textfile(filename) + _sink = source.map(str).sink_to_textfile(filename) source.emit(0) source.emit(1) - sleep(0.01) + _sink._fp.flush() assert open(filename, "r").read() == "0\n1\n" @@ -55,5 +54,5 @@ def test_sink_to_textfile_closes(): _global_sinks.remove(sink) del sink - with pytest.raises(ValueError): # I/O operation on closed file + with pytest.raises(ValueError, match=r"I/O operation on closed file\."): fp.write(".") From b51c6aeeb45c09479d0b91dc10f120f107921ef0 Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Tue, 17 Nov 2020 14:50:07 +0300 Subject: [PATCH 5/7] default downsteams is empty list, override destroy for sinks --- streamz/core.py | 18 +++++++----------- streamz/sinks.py | 4 ++++ streamz/tests/test_core.py | 2 +- streamz/tests/test_sinks.py | 12 ++++++++++++ 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index 6df6463c..0149b691 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -165,8 +165,10 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None, self.downstreams = OrderedWeakrefSet() if upstreams is not None: self.upstreams = list(upstreams) - else: + elif upstream is not None: self.upstreams = [upstream] + else: + self.upstreams = [] self._set_asynchronous(asynchronous) self._set_loop(loop) @@ -236,10 +238,7 @@ def _inform_asynchronous(self, asynchronous): def _add_upstream(self, upstream): """Add upstream to current upstreams, this method is overridden for classes which handle stream specific buffers/caches""" - if self.upstreams == [None]: - self.upstreams[0] = upstream - else: - self.upstreams.append(upstream) + self.upstreams.append(upstream) def _add_downstream(self, downstream): """Add downstream to current downstreams""" @@ -252,10 +251,7 @@ def _remove_downstream(self, downstream): def _remove_upstream(self, upstream): """Remove upstream from current upstreams, this method is overridden for classes which handle stream specific buffers/caches""" - if len(self.upstreams) == 1: - self.upstreams[0] = [None] - else: - self.upstreams.remove(upstream) + self.upstreams.remove(upstream) @classmethod def register_api(cls, modifier=identity, attribute_name=None): @@ -527,8 +523,8 @@ def destroy(self, streams=None): if streams is None: streams = self.upstreams for upstream in list(streams): - upstream.downstreams.remove(self) - self.upstreams.remove(upstream) + upstream._remove_downstream(self) + self._remove_upstream(upstream) def scatter(self, **kwargs): from .dask import scatter diff --git a/streamz/sinks.py b/streamz/sinks.py index bd438071..1b721567 100644 --- a/streamz/sinks.py +++ b/streamz/sinks.py @@ -67,6 +67,10 @@ def update(self, x, who=None, metadata=None): else: return [] + def destroy(self): + super().destroy() + _global_sinks.remove(self) + @Stream.register_api() class sink_to_textfile(Sink): diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 324c3deb..b288ed32 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -1080,7 +1080,7 @@ def test_connect(): # connect assumes this default behaviour # of stream initialization assert not source_downstream.downstreams - assert source_downstream.upstreams == [None] + assert source_downstream.upstreams == [] # initialize the second stream to connect to source_upstream = Stream() diff --git a/streamz/tests/test_sinks.py b/streamz/tests/test_sinks.py index aecd8f08..6af7eb42 100644 --- a/streamz/tests/test_sinks.py +++ b/streamz/tests/test_sinks.py @@ -1,3 +1,5 @@ +import weakref + import pytest from streamz import Stream from streamz.sinks import _global_sinks @@ -56,3 +58,13 @@ def test_sink_to_textfile_closes(): with pytest.raises(ValueError, match=r"I/O operation on closed file\."): fp.write(".") + + +def test_sink_destroy(): + source = Stream() + sink = source.sink(lambda x: None) + ref = weakref.ref(sink) + sink.destroy() + del sink + + assert ref() is None From 973737776674686b1d3c5eb3598e3fb98aa7d127 Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Tue, 17 Nov 2020 14:52:11 +0300 Subject: [PATCH 6/7] sink_to_textfile write without threads --- streamz/sinks.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/streamz/sinks.py b/streamz/sinks.py index 1b721567..358339ff 100644 --- a/streamz/sinks.py +++ b/streamz/sinks.py @@ -105,11 +105,10 @@ def __init__(self, upstream, file, end="\n", mode="a", **kwargs): self._end = end self._fp = open(file, mode=mode) if isinstance(file, str) else file weakref.finalize(self, self._fp.close) - super().__init__(upstream, ensure_io_loop=True, **kwargs) + super().__init__(upstream, **kwargs) def __del__(self): self._fp.close() - @gen.coroutine def update(self, x, who=None, metadata=None): - yield self.loop.run_in_executor(None, self._fp.write, x + self._end) + self._fp.write(x + self._end) From 269d3ab397103436c1599b013a67aa83324e28c0 Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Tue, 17 Nov 2020 19:39:01 +0300 Subject: [PATCH 7/7] remove None upstream handling from graphs --- streamz/graph.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/streamz/graph.py b/streamz/graph.py index 0189e8d7..8b181fc2 100644 --- a/streamz/graph.py +++ b/streamz/graph.py @@ -69,8 +69,6 @@ def create_graph(node, graph): """ # Step 1 build a set of all the nodes node_set = build_node_set(node) - if None in node_set: - node_set.remove(None) # Step 2 for each node in the set add to the graph for n in node_set: @@ -87,8 +85,7 @@ def create_graph(node, graph): # Step 3 for each node establish its edges for n in node_set: t = hash(n) - upstreams = [_ for _ in n.upstreams if _ is not None] - for nn in upstreams: + for nn in n.upstreams: tt = hash(nn) graph.add_edge(tt, t)