Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A simple plugin system #380

Merged
merged 10 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ install:
- conda update conda

# Install dependencies
- conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml
- travis_wait 30 conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml
- source activate test-streamz

- python setup.py install
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,4 @@ data streaming systems like `Apache Flink <https://flink.apache.org/>`_,
collections-api.rst
async.rst
plotting.rst
plugins.rst
102 changes: 102 additions & 0 deletions docs/source/plugins.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
Plugins
=======

In addition to using ``@Stream.register_api()`` decorator, custom stream nodes can
be added to Streamz by installing 3rd-party Python packages.


Known plugins
-------------

Extras
++++++

These plugins are supported by the Streamz community and can be installed as extras,
e.g. ``pip install streamz[kafka]``.

There are no plugins here yet, but hopefully soon there will be.

.. only:: comment
================= ======================================================
Extra name Description
================= ======================================================
``files`` Advanced filesystem operations: listening for new
files in a directory, writing to multiple files etc.
``kafka`` Reading from and writing to Kafka topics.
================= ======================================================


Entry points
------------

Plugins register themselves with Streamz by using ``entry_points`` argument
in ``setup.py``:

.. code-block:: Python

# setup.py

from setuptools import setup

setup(
name="streamz_example_plugin",
version="0.0.1",
entry_points={
"streamz.nodes": [
"repeat = streamz_example_plugin:RepeatNode"
]
}
)

In this example, ``RepeatNode`` class will be imported from
``streamz_example_plugin`` package and will be available as ``Stream.repeat``.
In practice, class name and entry point name (the part before ``=`` in entry point
definition) are usually the same, but they `can` be different.

Different kinds of add-ons go into different entry point groups:

=========== ======================= =====================
Node type Required parent class Entry point group
=========== ======================= =====================
Source ``streamz.Source`` ``streamz.sources``
Node ``streamz.Stream`` ``streamz.nodes``
Sink ``streamz.Stream`` ``streamz.sinks``
=========== ======================= =====================


Lazy loading
++++++++++++

Streamz will attach methods from existing plugins to the ``Stream`` class when it's
imported, but actual classes will be loaded only when the corresponding ``Stream``
method is first called. Streamz will also validate the loaded class before attaching it
and will raise an appropriate exception if validation fails.


Reference implementation
------------------------

Let's look at how stream nodes can be implemented.

.. code-block:: Python

# __init__.py

from tornado import gen
from streamz import Stream


class RepeatNode(Stream):

def __init__(self, upstream, n, **kwargs):
super().__init__(upstream, ensure_io_loop=True, **kwargs)
self._n = n

@gen.coroutine
def update(self, x, who=None, metadata=None):
for _ in range(self._n):
yield self._emit(x, metadata=metadata)

As you can see, implementation is the same as usual, but there's no
``@Stream.register_api()`` — Streamz will take care of that when loading the plugin.
It will still work if you add the decorator, but you don't have to.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ tornado
toolz
zict
six
setuptools
4 changes: 4 additions & 0 deletions streamz/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from .core import *
from .graph import *
from .sources import *
from .plugins import load_plugins

load_plugins(Stream)

try:
from .dask import DaskStream, scatter
except ImportError:
Expand Down
32 changes: 29 additions & 3 deletions streamz/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def _remove_upstream(self, upstream):
self.upstreams.remove(upstream)

@classmethod
def register_api(cls, modifier=identity):
def register_api(cls, modifier=identity, attribute_name=None):
""" Add callable to Stream API

This allows you to register a new method onto this class. You can use
Expand All @@ -273,7 +273,7 @@ def register_api(cls, modifier=identity):
>>> Stream().foo(...) # this works now

It attaches the callable as a normal attribute to the class object. In
doing so it respsects inheritance (all subclasses of Stream will also
doing so it respects inheritance (all subclasses of Stream will also
get the foo attribute).

By default callables are assumed to be instance methods. If you like
Expand All @@ -285,15 +285,41 @@ def register_api(cls, modifier=identity):
... ...

>>> Stream.foo(...) # Foo operates as a static method

You can also provide an optional ``attribute_name`` argument to control
the name of the attribute your callable will be attached as.

>>> @Stream.register_api(attribute_name="bar")
... class foo(Stream):
... ...

>> Stream().bar(...) # foo was actually attached as bar
"""
def _(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
return func(*args, **kwargs)
setattr(cls, func.__name__, modifier(wrapped))
name = attribute_name if attribute_name else func.__name__
setattr(cls, name, modifier(wrapped))
return func
return _

@classmethod
def register_plugin_entry_point(cls, entry_point, modifier=identity):
def stub(*args, **kwargs):
node = entry_point.load()
if not issubclass(node, Stream):
raise TypeError(
f"Error loading {entry_point.name} "
f"from module {entry_point.module_name}: "
f"{node.__class__.__name__} must be a subclass of Stream"
)
cls.register_api(
modifier=modifier, attribute_name=entry_point.name
)(node)
return node(*args, **kwargs)
cls.register_api(modifier=modifier, attribute_name=entry_point.name)(stub)

def start(self):
""" Start any upstream sources """
for upstream in self.upstreams:
Expand Down
4 changes: 2 additions & 2 deletions streamz/dataframe/tests/test_dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1008,9 +1008,9 @@ def test_windowed_groupby_aggs_with_start_state(stream):
out_df1 = pd.DataFrame({'name':['Alice', 'Bob', 'Linda', 'Tom'], 'amount':[50.0, 550.0, 100.0, 150.0]})
assert_eq(output1[-1][1].reset_index(), out_df1)


def test_dir(stream):
example = pd.DataFrame({'name': [], 'amount': []})
sdf = DataFrame(stream, example=example)
assert 'name' in dir(sdf)
assert 'amount' in dir(sdf)
assert 'amount' in dir(sdf)
10 changes: 10 additions & 0 deletions streamz/plugins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import pkg_resources


def load_plugins(cls):
for entry_point in pkg_resources.iter_entry_points("streamz.sources"):
cls.register_plugin_entry_point(entry_point, staticmethod)
for entry_point in pkg_resources.iter_entry_points("streamz.nodes"):
cls.register_plugin_entry_point(entry_point)
for entry_point in pkg_resources.iter_entry_points("streamz.sinks"):
cls.register_plugin_entry_point(entry_point)
69 changes: 69 additions & 0 deletions streamz/tests/test_plugins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pytest
from streamz import Source, Stream


class MockEntryPoint:

def __init__(self, name, cls, module_name=None):
self.name = name
self.cls = cls
self.module_name = module_name

def load(self):
return self.cls


def test_register_plugin_entry_point():
class test_stream(Stream):
pass

entry_point = MockEntryPoint("test_node", test_stream)
Stream.register_plugin_entry_point(entry_point)

assert Stream.test_node.__name__ == "stub"

Stream().test_node()

assert Stream.test_node.__name__ == "test_stream"


def test_register_plugin_entry_point_modifier():
class test_source(Source):
pass

def modifier(fn):
fn.__name__ = 'modified_name'
return staticmethod(fn)

entry_point = MockEntryPoint("from_test", test_source)
Stream.register_plugin_entry_point(entry_point, modifier)

Stream.from_test()

assert Stream.from_test.__name__ == "modified_name"


def test_register_plugin_entry_point_raises():
class invalid_node:
pass

entry_point = MockEntryPoint("test", invalid_node, "test_module.test")

Stream.register_plugin_entry_point(entry_point)

with pytest.raises(TypeError):
Stream.test()


def test_register_plugin_entry_point_already_registered():
@Stream.register_api()
class test(Stream):
pass

entry_point = MockEntryPoint("test_double", test, "test_module")

Stream.register_plugin_entry_point(entry_point)

assert Stream.test_double.__name__ == "stub"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm not sure whether the method should have been overwritten - presumably Stream.test_double already existed before the call to register_plugin_entry_point.

In normal usage, the stubs always come first, since they happen at import time, and streamz should always be imported before derived classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I guess this test actually doesn't test anything. I wanted to test that running Stream.register_api twice on the same class doesn't break anything, in case the implementation of register_api changes. We can just check for this in register_plugin_entry_point.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is OK, but not comprehensive. The cases are:

  • register_api, then register entrypoint
  • register entrypoint then register_api
  • either of the to methods called twice

In every case, calling the method should result in the same test class.

We could check for whether the name is being overwritten, but I don't think that's essential.

Copy link
Contributor Author

@roveo roveo Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In every case, calling the method should result in the same test class.

The class is stored in Stream.method.__wrapped__, but I don't see how anything else but the test class can end up there in any of these cases.

Btw, what should we do if a plugin wants to override a built-in method? On the one hand, this would allow people to create useful extensions of built-in functionality (my version of partition could be provided as a plugin long before it's merged into core), on the other — it could break things and lead to confusion.

Copy link
Member

@CJ-Wright CJ-Wright Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would have the system balk at a built-in override. My hope would be that names are cheap, so adding more text onto a would be partition describing how it is different would add less burden than issues around which partition was used. This could get particularly thorny as envs get updated, code that previously worked and relied on the built-in are now producing errors in subtle ways that could be difficult to find.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean we should have a clobber=False argument for the register methods?
I wonder in that case, if there is a sane way to tell when an entrypoint and register_api refer to the same thing, which would not be a problem even without clobber.

Copy link
Contributor Author

@roveo roveo Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder in that case, if there is a sane way to tell when an entrypoint and register_api refer to the same thing, which would not be a problem even without clobber.

We can just check that hasattr(Stream, entry_point.name). But it would have to happen during plugin load (so that we're not overwriting built-in methods with stubs) and so importing streamz with a bad plugin would result in an error. Or it could be a warning that we skipped an entrypoint because of name collision.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but this will fail when someone adds the entrypoint to a class that already has register_api, no?

Note @CJ-Wright , that the current implementation does allow you to use register_api to overwrite methods at will. Of course this is python, so we can't ever disable someone who wants to do that anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. To some extent this boils down to locality for me. If the author of the code is the user of the env then if they override things then they are in a better place to clean up. When installing 3rd party things that state may not hold as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the author of the code is the user of the env then if they override things then they are in a better place to clean up. When installing 3rd party things that state may not hold as well.

I agree. With register_api things are explicit and visible. A plugin is a black box.

@martindurant please look at the last commit to see what I mean

Stream.test_double()
assert Stream.test_double.__name__ == "test"