Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sinks #386

Merged
merged 10 commits into from
Nov 24, 2020
2 changes: 2 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Stream
rate_limit
scatter
sink
sink_to_textfile
slice
sliding_window
starmap
Expand Down Expand Up @@ -84,6 +85,7 @@ Definitions
.. autofunction:: partition
.. autofunction:: rate_limit
.. autofunction:: sink
.. autofunction:: sink_to_textfile
.. autofunction:: sliding_window
.. autofunction:: Stream
.. autofunction:: timed_window
Expand Down
2 changes: 1 addition & 1 deletion docs/source/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Different kinds of add-ons go into different entry point groups:
=========== ======================= =====================
Source ``streamz.Source`` ``streamz.sources``
Node ``streamz.Stream`` ``streamz.nodes``
Sink ``streamz.Stream`` ``streamz.sinks``
Sink ``streamz.Sink`` ``streamz.sinks``
=========== ======================= =====================


Expand Down
1 change: 1 addition & 0 deletions streamz/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .core import *
from .graph import *
from .sources import *
from .sinks import *
martindurant marked this conversation as resolved.
Show resolved Hide resolved
from .plugins import load_plugins

load_plugins(Stream)
Expand Down
44 changes: 0 additions & 44 deletions streamz/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@

no_default = '--no-default--'

_global_sinks = set()

_html_update_streams = set()

thread_state = threading.local()
Expand Down Expand Up @@ -656,48 +654,6 @@ def _release_refs(self, metadata, n=1):
m['ref'].release(n)


@Stream.register_api()
class sink(Stream):
""" Apply a function on every element

Examples
--------
>>> source = Stream()
>>> L = list()
>>> source.sink(L.append)
>>> source.sink(print)
>>> source.sink(print)
>>> source.emit(123)
123
123
>>> L
[123]

See Also
--------
map
Stream.sink_to_list
"""
_graphviz_shape = 'trapezium'

def __init__(self, upstream, func, *args, **kwargs):
self.func = func
# take the stream specific kwargs out
stream_name = kwargs.pop("stream_name", None)
self.kwargs = kwargs
self.args = args

Stream.__init__(self, upstream, stream_name=stream_name)
_global_sinks.add(self)

def update(self, x, who=None, metadata=None):
result = self.func(x, *self.args, **self.kwargs)
if gen.isawaitable(result):
return result
else:
return []


@Stream.register_api()
class map(Stream):
""" Apply a function to every element in the stream
Expand Down
111 changes: 111 additions & 0 deletions streamz/sinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import inspect
import weakref

from tornado import gen

from streamz import Stream

# sinks add themselves here to avoid being garbage-collected
_global_sinks = set()
roveo marked this conversation as resolved.
Show resolved Hide resolved


class Sink(Stream):

_graphviz_shape = 'trapezium'

def __init__(self, upstream, **kwargs):
roveo marked this conversation as resolved.
Show resolved Hide resolved
super().__init__(upstream, **kwargs)
_global_sinks.add(self)


@Stream.register_api()
class sink(Sink):
""" Apply a function on every element

Parameters
----------
func: callable
A function that will be applied on every element.
args:
Positional arguments that will be passed to ``func`` after the incoming element.
kwargs:
Stream-specific arguments will be passed to ``Stream.__init__``, the rest of
them will be passed to ``func``.

Examples
--------
>>> source = Stream()
>>> L = list()
>>> source.sink(L.append)
>>> source.sink(print)
>>> source.sink(print)
>>> source.emit(123)
123
123
>>> L
[123]

See Also
--------
map
Stream.sink_to_list
"""

def __init__(self, upstream, func, *args, **kwargs):
self.func = func
# take the stream specific kwargs out
sig = set(inspect.signature(Stream).parameters)
stream_kwargs = {k: v for (k, v) in kwargs.items() if k in sig}
self.kwargs = {k: v for (k, v) in kwargs.items() if k not in sig}
self.args = args
super().__init__(upstream, **stream_kwargs)

def update(self, x, who=None, metadata=None):
result = self.func(x, *self.args, **self.kwargs)
if gen.isawaitable(result):
return result
else:
return []


@Stream.register_api()
class sink_to_textfile(Sink):
""" Write elements to a plain text file, one element per line.

Type of elements must be ``str``.

Parameters
----------
file: str or file-like
File to write the elements to. ``str`` is treated as a file name to open.
If file-like, descriptor must be open in text mode. Note that the file
descriptor will be closed when this sink is destroyed.
end: str, optional
This value will be written to the file after each element.
Defaults to newline character.
mode: str, optional
If file is ``str``, file will be opened in this mode. Defaults to ``"a"``
(append mode).

Examples
--------
>>> source = Stream()
>>> source.map(str).sink_to_textfile("test.txt")
>>> source.emit(0)
>>> source.emit(1)
>>> print(open("test.txt", "r").read())
0
1
"""
def __init__(self, upstream, file, end="\n", mode="a", **kwargs):
self._end = end
self._fp = open(file, mode=mode) if isinstance(file, str) else file
weakref.finalize(self, self._fp.close)
super().__init__(upstream, ensure_io_loop=True, **kwargs)

def __del__(self):
roveo marked this conversation as resolved.
Show resolved Hide resolved
self._fp.close()

@gen.coroutine
def update(self, x, who=None, metadata=None):
yield self.loop.run_in_executor(None, self._fp.write, x + self._end)
17 changes: 0 additions & 17 deletions streamz/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,23 +395,6 @@ def test_sink_to_file():
assert data == 'a\nb\n'


def test_sink_with_args_and_kwargs():
L = dict()

def mycustomsink(elem, key, prefix=""):
key = prefix + key
if key not in L:
L[key] = list()
L[key].append(elem)

s = Stream()
s.sink(mycustomsink, "cat", "super")

s.emit(1)
s.emit(2)
assert L['supercat'] == [1, 2]


@gen_test()
def test_counter():
counter = itertools.count()
Expand Down
58 changes: 58 additions & 0 deletions streamz/tests/test_sinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import pytest
from streamz import Stream
from streamz.sinks import _global_sinks
from streamz.utils_test import tmpfile


def test_sink_with_args_and_kwargs():
L = dict()

def mycustomsink(elem, key, prefix=""):
key = prefix + key
if key not in L:
L[key] = list()
L[key].append(elem)

s = Stream()
sink = s.sink(mycustomsink, "cat", "super", stream_name="test")
s.emit(1)
s.emit(2)

assert L['supercat'] == [1, 2]
assert sink.name == "test"


def test_sink_to_textfile_fp():
source = Stream()
with tmpfile() as filename, open(filename, "w") as fp:
source.map(str).sink_to_textfile(fp)
source.emit(0)
source.emit(1)

fp.flush()

assert open(filename, "r").read() == "0\n1\n"


def test_sink_to_textfile_named():
source = Stream()
with tmpfile() as filename:
_sink = source.map(str).sink_to_textfile(filename)
source.emit(0)
source.emit(1)

_sink._fp.flush()

assert open(filename, "r").read() == "0\n1\n"


def test_sink_to_textfile_closes():
source = Stream()
with tmpfile() as filename:
sink = source.sink_to_textfile(filename)
fp = sink._fp
_global_sinks.remove(sink)
roveo marked this conversation as resolved.
Show resolved Hide resolved
del sink

with pytest.raises(ValueError, match=r"I/O operation on closed file\."):
fp.write(".")