Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A simple plugin system #380

Merged
merged 10 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ install:
- conda update conda

# Install dependencies
- conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml
- travis_wait 30 conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml
- source activate test-streamz

- python setup.py install
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,4 @@ data streaming systems like `Apache Flink <https://flink.apache.org/>`_,
collections-api.rst
async.rst
plotting.rst
plugins.rst
102 changes: 102 additions & 0 deletions docs/source/plugins.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
Plugins
=======

In addition to using ``@Stream.register_api()`` decorator, custom stream nodes can
be added to Streamz by installing 3rd-party Python packages.


Known plugins
-------------

Extras
++++++

These plugins are supported by the Streamz community and can be installed as extras,
e.g. ``pip install streamz[kafka]``.

There are no plugins here yet, but hopefully soon there will be.

.. only:: comment
================= ======================================================
Extra name Description
================= ======================================================
``files`` Advanced filesystem operations: listening for new
files in a directory, writing to multiple files etc.
``kafka`` Reading from and writing to Kafka topics.
================= ======================================================


Entry points
------------

Plugins register themselves with Streamz by using ``entry_points`` argument
in ``setup.py``:

.. code-block:: Python

# setup.py

from setuptools import setup

setup(
name="streamz_example_plugin",
version="0.0.1",
entry_points={
"streamz.nodes": [
"repeat = streamz_example_plugin:RepeatNode"
]
}
)

In this example, ``RepeatNode`` class will be imported from
``streamz_example_plugin`` package and will be available as ``Stream.repeat``.
In practice, class name and entry point name (the part before ``=`` in entry point
definition) are usually the same, but they `can` be different.

Different kinds of add-ons go into different entry point groups:

=========== ======================= =====================
Node type Required parent class Entry point group
=========== ======================= =====================
Source ``streamz.Source`` ``streamz.sources``
Node ``streamz.Stream`` ``streamz.nodes``
Sink ``streamz.Stream`` ``streamz.sinks``
=========== ======================= =====================


Lazy loading
++++++++++++

Streamz will attach methods from existing plugins to the ``Stream`` class when it's
imported, but actual classes will be loaded only when the corresponding ``Stream``
method is first called. Streamz will also validate the loaded class before attaching it
and will raise an appropriate exception if validation fails.


Reference implementation
------------------------

Let's look at how stream nodes can be implemented.

.. code-block:: Python

# __init__.py

from tornado import gen
from streamz import Stream


class RepeatNode(Stream):

def __init__(self, upstream, n, **kwargs):
super().__init__(upstream, ensure_io_loop=True, **kwargs)
self._n = n

@gen.coroutine
def update(self, x, who=None, metadata=None):
for _ in range(self._n):
yield self._emit(x, metadata=metadata)

As you can see, implementation is the same as usual, but there's no
``@Stream.register_api()`` — Streamz will take care of that when loading the plugin.
It will still work if you add the decorator, but you don't have to.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ tornado
toolz
zict
six
setuptools
4 changes: 4 additions & 0 deletions streamz/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from .core import *
from .graph import *
from .sources import *
from .plugins import load_plugins

load_plugins(Stream)

try:
from .dask import DaskStream, scatter
except ImportError:
Expand Down
40 changes: 37 additions & 3 deletions streamz/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def _remove_upstream(self, upstream):
self.upstreams.remove(upstream)

@classmethod
def register_api(cls, modifier=identity):
def register_api(cls, modifier=identity, attribute_name=None):
""" Add callable to Stream API

This allows you to register a new method onto this class. You can use
Expand All @@ -273,7 +273,7 @@ def register_api(cls, modifier=identity):
>>> Stream().foo(...) # this works now

It attaches the callable as a normal attribute to the class object. In
doing so it respsects inheritance (all subclasses of Stream will also
doing so it respects inheritance (all subclasses of Stream will also
get the foo attribute).

By default callables are assumed to be instance methods. If you like
Expand All @@ -285,15 +285,49 @@ def register_api(cls, modifier=identity):
... ...

>>> Stream.foo(...) # Foo operates as a static method

You can also provide an optional ``attribute_name`` argument to control
the name of the attribute your callable will be attached as.

>>> @Stream.register_api(attribute_name="bar")
... class foo(Stream):
... ...

>> Stream().bar(...) # foo was actually attached as bar
"""
def _(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
return func(*args, **kwargs)
setattr(cls, func.__name__, modifier(wrapped))
name = attribute_name if attribute_name else func.__name__
setattr(cls, name, modifier(wrapped))
return func
return _

@classmethod
def register_plugin_entry_point(cls, entry_point, modifier=identity):
if hasattr(cls, entry_point.name):
raise ValueError(
f"Can't add {entry_point.name} from {entry_point.module_name} "
f"to {cls.__name__}: duplicate method name."
)

def stub(*args, **kwargs):
roveo marked this conversation as resolved.
Show resolved Hide resolved
""" Entrypoints-based streamz plugin. Will be loaded on first call. """
node = entry_point.load()
if not issubclass(node, Stream):
raise TypeError(
f"Error loading {entry_point.name} "
f"from module {entry_point.module_name}: "
f"{node.__class__.__name__} must be a subclass of Stream"
)
if getattr(cls, entry_point.name).__name__ == "stub":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this can be False, but it doesn't hurt to check.

Copy link
Contributor Author

@roveo roveo Nov 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is false when the plugin class returned from entry_point.load() is decorated with @Stream.register_api. Then it is registered right away when loaded, and so at the moment of this check stub has already been overwritten with the actual class.

cls.register_api(
modifier=modifier, attribute_name=entry_point.name
)(node)
return node(*args, **kwargs)
cls.register_api(modifier=modifier, attribute_name=entry_point.name)(stub)

def start(self):
""" Start any upstream sources """
for upstream in self.upstreams:
Expand Down
4 changes: 2 additions & 2 deletions streamz/dataframe/tests/test_dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1008,9 +1008,9 @@ def test_windowed_groupby_aggs_with_start_state(stream):
out_df1 = pd.DataFrame({'name':['Alice', 'Bob', 'Linda', 'Tom'], 'amount':[50.0, 550.0, 100.0, 150.0]})
assert_eq(output1[-1][1].reset_index(), out_df1)


def test_dir(stream):
example = pd.DataFrame({'name': [], 'amount': []})
sdf = DataFrame(stream, example=example)
assert 'name' in dir(sdf)
assert 'amount' in dir(sdf)
assert 'amount' in dir(sdf)
22 changes: 22 additions & 0 deletions streamz/plugins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import warnings

import pkg_resources


def try_register(cls, entry_point, *modifier):
try:
cls.register_plugin_entry_point(entry_point, *modifier)
except ValueError:
warnings.warn(
f"Can't add {entry_point.name} from {entry_point.module_name}: "
"name collision with existing stream node."
)


def load_plugins(cls):
for entry_point in pkg_resources.iter_entry_points("streamz.sources"):
try_register(cls, entry_point, staticmethod)
for entry_point in pkg_resources.iter_entry_points("streamz.nodes"):
try_register(cls, entry_point)
for entry_point in pkg_resources.iter_entry_points("streamz.sinks"):
try_register(cls, entry_point)
60 changes: 60 additions & 0 deletions streamz/tests/test_plugins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import inspect

import pytest
from streamz import Source, Stream


class MockEntryPoint:

def __init__(self, name, cls, module_name=None):
self.name = name
self.cls = cls
self.module_name = module_name

def load(self):
return self.cls


def test_register_plugin_entry_point():
class test_stream(Stream):
pass

entry_point = MockEntryPoint("test_node", test_stream)
Stream.register_plugin_entry_point(entry_point)

assert Stream.test_node.__name__ == "stub"

Stream().test_node()

assert Stream.test_node.__name__ == "test_stream"


def test_register_plugin_entry_point_modifier():
class test_source(Source):
pass

entry_point = MockEntryPoint("from_test", test_source)
Stream.register_plugin_entry_point(entry_point, staticmethod)

Stream.from_test()

assert inspect.isfunction(Stream().from_test)

roveo marked this conversation as resolved.
Show resolved Hide resolved

def test_register_plugin_entry_point_raises_type():
class invalid_node:
pass

entry_point = MockEntryPoint("test", invalid_node, "test_module.test")

Stream.register_plugin_entry_point(entry_point)

with pytest.raises(TypeError):
Stream.test()


def test_register_plugin_entry_point_raises_duplicate_name():
entry_point = MockEntryPoint("map", None)

with pytest.raises(ValueError):
Stream.register_plugin_entry_point(entry_point)