Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sinks #386

Merged
merged 10 commits into from
Nov 24, 2020
2 changes: 2 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Stream
rate_limit
scatter
sink
sink_to_textfile
slice
sliding_window
starmap
Expand Down Expand Up @@ -84,6 +85,7 @@ Definitions
.. autofunction:: partition
.. autofunction:: rate_limit
.. autofunction:: sink
.. autofunction:: sink_to_textfile
.. autofunction:: sliding_window
.. autofunction:: Stream
.. autofunction:: timed_window
Expand Down
2 changes: 1 addition & 1 deletion docs/source/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Different kinds of add-ons go into different entry point groups:
=========== ======================= =====================
Source ``streamz.Source`` ``streamz.sources``
Node ``streamz.Stream`` ``streamz.nodes``
Sink ``streamz.Stream`` ``streamz.sinks``
Sink ``streamz.Sink`` ``streamz.sinks``
=========== ======================= =====================


Expand Down
1 change: 1 addition & 0 deletions streamz/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .core import *
from .graph import *
from .sources import *
from .sinks import *
from .plugins import load_plugins

load_plugins(Stream)
Expand Down
44 changes: 0 additions & 44 deletions streamz/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@

no_default = '--no-default--'

_global_sinks = set()

_html_update_streams = set()

thread_state = threading.local()
Expand Down Expand Up @@ -656,48 +654,6 @@ def _release_refs(self, metadata, n=1):
m['ref'].release(n)


@Stream.register_api()
class sink(Stream):
""" Apply a function on every element

Examples
--------
>>> source = Stream()
>>> L = list()
>>> source.sink(L.append)
>>> source.sink(print)
>>> source.sink(print)
>>> source.emit(123)
123
123
>>> L
[123]

See Also
--------
map
Stream.sink_to_list
"""
_graphviz_shape = 'trapezium'

def __init__(self, upstream, func, *args, **kwargs):
self.func = func
# take the stream specific kwargs out
stream_name = kwargs.pop("stream_name", None)
self.kwargs = kwargs
self.args = args

Stream.__init__(self, upstream, stream_name=stream_name)
_global_sinks.add(self)

def update(self, x, who=None, metadata=None):
result = self.func(x, *self.args, **self.kwargs)
if gen.isawaitable(result):
return result
else:
return []


@Stream.register_api()
class map(Stream):
""" Apply a function to every element in the stream
Expand Down
96 changes: 96 additions & 0 deletions streamz/sinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from tornado import gen

from streamz import Stream

_global_sinks = set()


class Sink(Stream):

_graphviz_shape = 'trapezium'

def __init__(self, upstream, **kwargs):
super().__init__(upstream, **kwargs)
_global_sinks.add(self)


@Stream.register_api()
class sink(Sink):
""" Apply a function on every element

Examples
--------
>>> source = Stream()
>>> L = list()
>>> source.sink(L.append)
>>> source.sink(print)
>>> source.sink(print)
>>> source.emit(123)
123
123
>>> L
[123]

See Also
--------
map
Stream.sink_to_list
"""

def __init__(self, upstream, func, *args, **kwargs):
self.func = func
# take the stream specific kwargs out
stream_name = kwargs.pop("stream_name", None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine, and I know it's only moved code, but would be good to not have to update these whenever the superclass might gain more keywords

Copy link
Contributor Author

@roveo roveo Nov 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how we can both allow mixing stream-specific and func arguments in **kwargs and avoid explicitly listing them.

I can see two solutions here:

  • disallow stream kwargs and just call func(*args, **kwargs)
  • make it a normal stream and have the user deal with constructing the correct single-arg function (with wrapping, closure, partials etc.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see here: 5c1b3a6#diff-eb662403bf433f3884021435c3f6cd010b387b4174a01367ae415b3e593cde5eR56-R60

Still looks ugly to me, but it does what you asked for :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think, @CJ-Wright , is this better or worse?

self.kwargs = kwargs
self.args = args
super().__init__(upstream, stream_name=stream_name)

def update(self, x, who=None, metadata=None):
result = self.func(x, *self.args, **self.kwargs)
if gen.isawaitable(result):
return result
else:
return []


@Stream.register_api()
class sink_to_textfile(Sink):
""" Write elements to a plain text file, one element per line.

Type of elements must be ``str``.

Arguments
---------
file: str or file-like
File to write the elements to. ``str`` is treated as a file name to open.
If file-like, descriptor must be open in text mode. Note that the file
descriptor will be closed when this sink is destroyed.
end: str, optional
This value will be written to the file after each element.
Defaults to newline character.
mode: str, optional
If file is ``str``, file will be opened in this mode. Defaults to ``"a"``
(append mode).

Examples
--------
>>> source = Stream()
>>> source.map(str).sink_to_textfile("test.txt")
>>> source.emit(0)
>>> source.emit(1)
>>> print(open("test.txt", "r").read())
0
1
"""
def __init__(self, upstream, file, end="\n", mode="a", **kwargs):
self._fp = open(file, mode=mode, buffering=1) if isinstance(file, str) else file
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integrate with fsspec.open?
Why set the buffering?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set the buffering?

Used line buffering in development to see what's going on without delay, forgot to remove. Also some tests won't work as is without it, but I guess I'll just use wait_for.

Integrate with fsspec.open?

This is really a question about what filesystem functionality should be provided in core. If we want to integrate this sink with fsspec, then from_textfile should be updated too (or some other nodes added, like from_fs/sink_to_fs), which is beyond sink refactoring. And then, we should think about how users might have a way to install fsspec extras (s3, hdfs etc.) and what other fs-related nodes might be useful, like:

  • listening for new files in a folder/glob and "tailing" them
  • writing to same-sized file partitions (like split command)
  • writing to files partitioned by time (e.g. rotating every 5 minutes)

What I'm getting at is that it looks like a plugin to me. We could have streamz[fs] for nodes and fsspec built-ins and then streamz[s3,hdfs] for extras.

self._end = end
super().__init__(upstream, ensure_io_loop=True, **kwargs)

def __del__(self):
self._fp.close()

@gen.coroutine
def update(self, x, who=None, metadata=None):
yield self.loop.run_in_executor(None, self._fp.write, x)
yield self.loop.run_in_executor(None, self._fp.write, self._end)
17 changes: 0 additions & 17 deletions streamz/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,23 +395,6 @@ def test_sink_to_file():
assert data == 'a\nb\n'


def test_sink_with_args_and_kwargs():
L = dict()

def mycustomsink(elem, key, prefix=""):
key = prefix + key
if key not in L:
L[key] = list()
L[key].append(elem)

s = Stream()
s.sink(mycustomsink, "cat", "super")

s.emit(1)
s.emit(2)
assert L['supercat'] == [1, 2]


@gen_test()
def test_counter():
counter = itertools.count()
Expand Down
59 changes: 59 additions & 0 deletions streamz/tests/test_sinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from time import sleep

import pytest
from streamz import Stream
from streamz.sinks import _global_sinks
from streamz.utils_test import tmpfile


def test_sink_with_args_and_kwargs():
L = dict()

def mycustomsink(elem, key, prefix=""):
key = prefix + key
if key not in L:
L[key] = list()
L[key].append(elem)

s = Stream()
s.sink(mycustomsink, "cat", "super")

s.emit(1)
s.emit(2)
assert L['supercat'] == [1, 2]


def test_sink_to_textfile_fp():
source = Stream()
with tmpfile() as filename, open(filename, "w", buffering=1) as fp:
source.map(str).sink_to_textfile(fp)
source.emit(0)
source.emit(1)

sleep(0.01)

assert open(filename, "r").read() == "0\n1\n"


def test_sink_to_textfile_named():
source = Stream()
with tmpfile() as filename:
source.map(str).sink_to_textfile(filename)
source.emit(0)
source.emit(1)

sleep(0.01)

assert open(filename, "r").read() == "0\n1\n"


def test_sink_to_textfile_closes():
source = Stream()
with tmpfile() as filename:
sink = source.sink_to_textfile(filename)
fp = sink._fp
_global_sinks.remove(sink)
del sink

with pytest.raises(ValueError): # I/O operation on closed file
fp.write(".")