Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sinks #386

Merged
merged 10 commits into from
Nov 24, 2020
2 changes: 2 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ Stream
rate_limit
scatter
sink
sink_to_textfile
slice
sliding_window
starmap
@@ -84,6 +85,7 @@ Definitions
.. autofunction:: partition
.. autofunction:: rate_limit
.. autofunction:: sink
.. autofunction:: sink_to_textfile
.. autofunction:: sliding_window
.. autofunction:: Stream
.. autofunction:: timed_window
2 changes: 1 addition & 1 deletion docs/source/plugins.rst
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ Different kinds of add-ons go into different entry point groups:
=========== ======================= =====================
Source ``streamz.Source`` ``streamz.sources``
Node ``streamz.Stream`` ``streamz.nodes``
Sink ``streamz.Stream`` ``streamz.sinks``
Sink ``streamz.Sink`` ``streamz.sinks``
=========== ======================= =====================


1 change: 1 addition & 0 deletions streamz/__init__.py
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
from .core import *
from .graph import *
from .sources import *
from .sinks import *
martindurant marked this conversation as resolved.
Show resolved Hide resolved
from .plugins import load_plugins

load_plugins(Stream)
62 changes: 7 additions & 55 deletions streamz/core.py
Original file line number Diff line number Diff line change
@@ -27,8 +27,6 @@

no_default = '--no-default--'

_global_sinks = set()

_html_update_streams = set()

thread_state = threading.local()
@@ -167,8 +165,10 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None,
self.downstreams = OrderedWeakrefSet()
if upstreams is not None:
self.upstreams = list(upstreams)
else:
elif upstream is not None:
self.upstreams = [upstream]
else:
self.upstreams = []

self._set_asynchronous(asynchronous)
self._set_loop(loop)
@@ -238,10 +238,7 @@ def _inform_asynchronous(self, asynchronous):
def _add_upstream(self, upstream):
"""Add upstream to current upstreams, this method is overridden for
classes which handle stream specific buffers/caches"""
if self.upstreams == [None]:
self.upstreams[0] = upstream
else:
self.upstreams.append(upstream)
self.upstreams.append(upstream)

def _add_downstream(self, downstream):
"""Add downstream to current downstreams"""
@@ -254,10 +251,7 @@ def _remove_downstream(self, downstream):
def _remove_upstream(self, upstream):
"""Remove upstream from current upstreams, this method is overridden for
classes which handle stream specific buffers/caches"""
if len(self.upstreams) == 1:
self.upstreams[0] = [None]
else:
self.upstreams.remove(upstream)
self.upstreams.remove(upstream)

@classmethod
def register_api(cls, modifier=identity, attribute_name=None):
@@ -529,8 +523,8 @@ def destroy(self, streams=None):
if streams is None:
streams = self.upstreams
for upstream in list(streams):
upstream.downstreams.remove(self)
self.upstreams.remove(upstream)
upstream._remove_downstream(self)
self._remove_upstream(upstream)

def scatter(self, **kwargs):
from .dask import scatter
@@ -656,48 +650,6 @@ def _release_refs(self, metadata, n=1):
m['ref'].release(n)


@Stream.register_api()
class sink(Stream):
""" Apply a function on every element

Examples
--------
>>> source = Stream()
>>> L = list()
>>> source.sink(L.append)
>>> source.sink(print)
>>> source.sink(print)
>>> source.emit(123)
123
123
>>> L
[123]

See Also
--------
map
Stream.sink_to_list
"""
_graphviz_shape = 'trapezium'

def __init__(self, upstream, func, *args, **kwargs):
self.func = func
# take the stream specific kwargs out
stream_name = kwargs.pop("stream_name", None)
self.kwargs = kwargs
self.args = args

Stream.__init__(self, upstream, stream_name=stream_name)
_global_sinks.add(self)

def update(self, x, who=None, metadata=None):
result = self.func(x, *self.args, **self.kwargs)
if gen.isawaitable(result):
return result
else:
return []


@Stream.register_api()
class map(Stream):
""" Apply a function to every element in the stream
5 changes: 1 addition & 4 deletions streamz/graph.py
Original file line number Diff line number Diff line change
@@ -69,8 +69,6 @@ def create_graph(node, graph):
"""
# Step 1 build a set of all the nodes
node_set = build_node_set(node)
if None in node_set:
node_set.remove(None)

# Step 2 for each node in the set add to the graph
for n in node_set:
@@ -87,8 +85,7 @@ def create_graph(node, graph):
# Step 3 for each node establish its edges
for n in node_set:
t = hash(n)
upstreams = [_ for _ in n.upstreams if _ is not None]
for nn in upstreams:
for nn in n.upstreams:
tt = hash(nn)
graph.add_edge(tt, t)

114 changes: 114 additions & 0 deletions streamz/sinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import inspect
import weakref

from tornado import gen

from streamz import Stream

# sinks add themselves here to avoid being garbage-collected
_global_sinks = set()
roveo marked this conversation as resolved.
Show resolved Hide resolved


class Sink(Stream):

_graphviz_shape = 'trapezium'

def __init__(self, upstream, **kwargs):
roveo marked this conversation as resolved.
Show resolved Hide resolved
super().__init__(upstream, **kwargs)
_global_sinks.add(self)


@Stream.register_api()
class sink(Sink):
""" Apply a function on every element

Parameters
----------
func: callable
A function that will be applied on every element.
args:
Positional arguments that will be passed to ``func`` after the incoming element.
kwargs:
Stream-specific arguments will be passed to ``Stream.__init__``, the rest of
them will be passed to ``func``.

Examples
--------
>>> source = Stream()
>>> L = list()
>>> source.sink(L.append)
>>> source.sink(print)
>>> source.sink(print)
>>> source.emit(123)
123
123
>>> L
[123]

See Also
--------
map
Stream.sink_to_list
"""

def __init__(self, upstream, func, *args, **kwargs):
self.func = func
# take the stream specific kwargs out
sig = set(inspect.signature(Stream).parameters)
stream_kwargs = {k: v for (k, v) in kwargs.items() if k in sig}
self.kwargs = {k: v for (k, v) in kwargs.items() if k not in sig}
self.args = args
super().__init__(upstream, **stream_kwargs)

def update(self, x, who=None, metadata=None):
result = self.func(x, *self.args, **self.kwargs)
if gen.isawaitable(result):
return result
else:
return []

def destroy(self):
super().destroy()
_global_sinks.remove(self)


@Stream.register_api()
class sink_to_textfile(Sink):
""" Write elements to a plain text file, one element per line.

Type of elements must be ``str``.

Parameters
----------
file: str or file-like
File to write the elements to. ``str`` is treated as a file name to open.
If file-like, descriptor must be open in text mode. Note that the file
descriptor will be closed when this sink is destroyed.
end: str, optional
This value will be written to the file after each element.
Defaults to newline character.
mode: str, optional
If file is ``str``, file will be opened in this mode. Defaults to ``"a"``
(append mode).

Examples
--------
>>> source = Stream()
>>> source.map(str).sink_to_textfile("test.txt")
>>> source.emit(0)
>>> source.emit(1)
>>> print(open("test.txt", "r").read())
0
1
"""
def __init__(self, upstream, file, end="\n", mode="a", **kwargs):
self._end = end
self._fp = open(file, mode=mode) if isinstance(file, str) else file
weakref.finalize(self, self._fp.close)
super().__init__(upstream, **kwargs)

def __del__(self):
roveo marked this conversation as resolved.
Show resolved Hide resolved
self._fp.close()

def update(self, x, who=None, metadata=None):
self._fp.write(x + self._end)
19 changes: 1 addition & 18 deletions streamz/tests/test_core.py
Original file line number Diff line number Diff line change
@@ -395,23 +395,6 @@ def test_sink_to_file():
assert data == 'a\nb\n'


def test_sink_with_args_and_kwargs():
L = dict()

def mycustomsink(elem, key, prefix=""):
key = prefix + key
if key not in L:
L[key] = list()
L[key].append(elem)

s = Stream()
s.sink(mycustomsink, "cat", "super")

s.emit(1)
s.emit(2)
assert L['supercat'] == [1, 2]


@gen_test()
def test_counter():
counter = itertools.count()
@@ -1097,7 +1080,7 @@ def test_connect():
# connect assumes this default behaviour
# of stream initialization
assert not source_downstream.downstreams
assert source_downstream.upstreams == [None]
assert source_downstream.upstreams == []

# initialize the second stream to connect to
source_upstream = Stream()
70 changes: 70 additions & 0 deletions streamz/tests/test_sinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import weakref

import pytest
from streamz import Stream
from streamz.sinks import _global_sinks
from streamz.utils_test import tmpfile


def test_sink_with_args_and_kwargs():
L = dict()

def mycustomsink(elem, key, prefix=""):
key = prefix + key
if key not in L:
L[key] = list()
L[key].append(elem)

s = Stream()
sink = s.sink(mycustomsink, "cat", "super", stream_name="test")
s.emit(1)
s.emit(2)

assert L['supercat'] == [1, 2]
assert sink.name == "test"


def test_sink_to_textfile_fp():
source = Stream()
with tmpfile() as filename, open(filename, "w") as fp:
source.map(str).sink_to_textfile(fp)
source.emit(0)
source.emit(1)

fp.flush()

assert open(filename, "r").read() == "0\n1\n"


def test_sink_to_textfile_named():
source = Stream()
with tmpfile() as filename:
_sink = source.map(str).sink_to_textfile(filename)
source.emit(0)
source.emit(1)

_sink._fp.flush()

assert open(filename, "r").read() == "0\n1\n"


def test_sink_to_textfile_closes():
source = Stream()
with tmpfile() as filename:
sink = source.sink_to_textfile(filename)
fp = sink._fp
_global_sinks.remove(sink)
roveo marked this conversation as resolved.
Show resolved Hide resolved
del sink

with pytest.raises(ValueError, match=r"I/O operation on closed file\."):
fp.write(".")


def test_sink_destroy():
source = Stream()
sink = source.sink(lambda x: None)
ref = weakref.ref(sink)
sink.destroy()
del sink

assert ref() is None