From b7a3692f06bbe1e5dcd14acfa1a09eab9f0d8f08 Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Mon, 9 Nov 2020 20:39:22 +0300 Subject: [PATCH 01/10] a simple plugin system --- streamz/__init__.py | 4 ++++ streamz/plugins.py | 6 ++++++ 2 files changed, 10 insertions(+) create mode 100644 streamz/plugins.py diff --git a/streamz/__init__.py b/streamz/__init__.py index ebd013c1..e521adfd 100644 --- a/streamz/__init__.py +++ b/streamz/__init__.py @@ -3,6 +3,10 @@ from .core import * from .graph import * from .sources import * +from .plugins import load_plugins + +load_plugins() + try: from .dask import DaskStream, scatter except ImportError: diff --git a/streamz/plugins.py b/streamz/plugins.py new file mode 100644 index 00000000..dc71836f --- /dev/null +++ b/streamz/plugins.py @@ -0,0 +1,6 @@ +import pkg_resources + + +def load_plugins(): + for entry_point in pkg_resources.iter_entry_points("streamz.plugins"): + entry_point.load() From b52b92cadfec7c7235f64fad6080c9cd0a553168 Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Tue, 10 Nov 2020 21:04:46 +0300 Subject: [PATCH 02/10] plugins lazy loading, specifying attribute_name in register_api, tests --- streamz/__init__.py | 2 +- streamz/core.py | 15 ++++++++++++-- streamz/plugins.py | 10 ++++++--- streamz/tests/test_plugins.py | 38 +++++++++++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 6 deletions(-) create mode 100644 streamz/tests/test_plugins.py diff --git a/streamz/__init__.py b/streamz/__init__.py index e521adfd..82e3cf46 100644 --- a/streamz/__init__.py +++ b/streamz/__init__.py @@ -5,7 +5,7 @@ from .sources import * from .plugins import load_plugins -load_plugins() +load_plugins(Stream) try: from .dask import DaskStream, scatter diff --git a/streamz/core.py b/streamz/core.py index c3f4512f..bd266467 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -260,7 +260,7 @@ def _remove_upstream(self, upstream): self.upstreams.remove(upstream) @classmethod - def register_api(cls, modifier=identity): + def register_api(cls, modifier=identity, attribute_name=None): """ Add callable to Stream API This allows you to register a new method onto this class. You can use @@ -290,10 +290,21 @@ def _(func): @functools.wraps(func) def wrapped(*args, **kwargs): return func(*args, **kwargs) - setattr(cls, func.__name__, modifier(wrapped)) + name = attribute_name if attribute_name else func.__name__ + setattr(cls, name, modifier(wrapped)) return func return _ + @classmethod + def register_plugin_entry_point(cls, entry_point, modifier=identity): + def stub(*args, **kwargs): + attribute = entry_point.load() + cls.register_api( + modifier=modifier, attribute_name=entry_point.name + )(attribute) + return attribute(*args, **kwargs) + cls.register_api(modifier=modifier, attribute_name=entry_point.name)(stub) + def start(self): """ Start any upstream sources """ for upstream in self.upstreams: diff --git a/streamz/plugins.py b/streamz/plugins.py index dc71836f..b1952651 100644 --- a/streamz/plugins.py +++ b/streamz/plugins.py @@ -1,6 +1,10 @@ import pkg_resources -def load_plugins(): - for entry_point in pkg_resources.iter_entry_points("streamz.plugins"): - entry_point.load() +def load_plugins(cls): + for entry_point in pkg_resources.iter_entry_points("streamz.sources"): + cls.register_plugin_entrypoint(entry_point, staticmethod) + for entry_point in pkg_resources.iter_entry_points("streamz.nodes"): + cls.register_plugin_entrypoint(entry_point) + for entry_point in pkg_resources.iter_entry_points("streamz.sinks"): + cls.register_plugin_entrypoint(entry_point) diff --git a/streamz/tests/test_plugins.py b/streamz/tests/test_plugins.py new file mode 100644 index 00000000..99ce36dd --- /dev/null +++ b/streamz/tests/test_plugins.py @@ -0,0 +1,38 @@ +from streamz.sources import Source +from streamz import Stream + + +class MockEntryPoint: + + def __init__(self, name, cls): + self.name = name + self.cls = cls + + def load(self): + return self.cls + + +def test_register_plugin_entry_point(): + class test(Stream): + pass + + entry_point = MockEntryPoint("test_node", test) + Stream.register_plugin_entry_point(entry_point) + + assert Stream.test_node.__name__ == "stub" + + Stream().test_node() + + assert Stream.test_node.__name__ == "test" + + +def test_register_plugin_entry_point_modifier(): + class test(Source): + pass + + entry_point = MockEntryPoint("from_test", test) + Stream.register_plugin_entry_point(entry_point, staticmethod) + + Stream.from_test() + + assert Stream.from_test.__self__ is Stream From dda82ff4b9b25f37cafc4e52405e0d319f13a0f2 Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Wed, 11 Nov 2020 01:05:54 +0300 Subject: [PATCH 03/10] tests, check class of node before registering --- streamz/core.py | 23 ++++++++++++++++++---- streamz/plugins.py | 6 +++--- streamz/tests/test_plugins.py | 37 +++++++++++++++++++++++++---------- 3 files changed, 49 insertions(+), 17 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index bd266467..a02a4128 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -273,7 +273,7 @@ def register_api(cls, modifier=identity, attribute_name=None): >>> Stream().foo(...) # this works now It attaches the callable as a normal attribute to the class object. In - doing so it respsects inheritance (all subclasses of Stream will also + doing so it respects inheritance (all subclasses of Stream will also get the foo attribute). By default callables are assumed to be instance methods. If you like @@ -285,6 +285,15 @@ def register_api(cls, modifier=identity, attribute_name=None): ... ... >>> Stream.foo(...) # Foo operates as a static method + + You can also provide an optional ``attribute_name`` argument to control + the name of the attribute your callable will be attached as. + + >>> @Stream.register_api(attribute_name="bar") + ... class foo(Stream): + ... ... + + >> Stream().bar(...) # foo was actually attached as bar """ def _(func): @functools.wraps(func) @@ -298,11 +307,17 @@ def wrapped(*args, **kwargs): @classmethod def register_plugin_entry_point(cls, entry_point, modifier=identity): def stub(*args, **kwargs): - attribute = entry_point.load() + node = entry_point.load() + if not issubclass(node, Stream): + raise TypeError( + f"Error loading {entry_point.name} " + f"from module {entry_point.module_name}: " + f"{entry_point.cls.__name__} must be a subclass of Stream" + ) cls.register_api( modifier=modifier, attribute_name=entry_point.name - )(attribute) - return attribute(*args, **kwargs) + )(node) + return node(*args, **kwargs) cls.register_api(modifier=modifier, attribute_name=entry_point.name)(stub) def start(self): diff --git a/streamz/plugins.py b/streamz/plugins.py index b1952651..313ee587 100644 --- a/streamz/plugins.py +++ b/streamz/plugins.py @@ -3,8 +3,8 @@ def load_plugins(cls): for entry_point in pkg_resources.iter_entry_points("streamz.sources"): - cls.register_plugin_entrypoint(entry_point, staticmethod) + cls.register_plugin_entry_point(entry_point, staticmethod) for entry_point in pkg_resources.iter_entry_points("streamz.nodes"): - cls.register_plugin_entrypoint(entry_point) + cls.register_plugin_entry_point(entry_point) for entry_point in pkg_resources.iter_entry_points("streamz.sinks"): - cls.register_plugin_entrypoint(entry_point) + cls.register_plugin_entry_point(entry_point) diff --git a/streamz/tests/test_plugins.py b/streamz/tests/test_plugins.py index 99ce36dd..116f440d 100644 --- a/streamz/tests/test_plugins.py +++ b/streamz/tests/test_plugins.py @@ -1,38 +1,55 @@ -from streamz.sources import Source -from streamz import Stream +import pytest +from streamz import Source, Stream class MockEntryPoint: - def __init__(self, name, cls): + def __init__(self, name, cls, module_name=None): self.name = name self.cls = cls + self.module_name = module_name def load(self): return self.cls def test_register_plugin_entry_point(): - class test(Stream): + class test_stream(Stream): pass - entry_point = MockEntryPoint("test_node", test) + entry_point = MockEntryPoint("test_node", test_stream) Stream.register_plugin_entry_point(entry_point) assert Stream.test_node.__name__ == "stub" Stream().test_node() - assert Stream.test_node.__name__ == "test" + assert Stream.test_node.__name__ == "test_stream" def test_register_plugin_entry_point_modifier(): - class test(Source): + class test_source(Source): pass - entry_point = MockEntryPoint("from_test", test) - Stream.register_plugin_entry_point(entry_point, staticmethod) + def modifier(fn): + fn.__name__ = 'modified_name' + return staticmethod(fn) + + entry_point = MockEntryPoint("from_test", test_source) + Stream.register_plugin_entry_point(entry_point, modifier) Stream.from_test() - assert Stream.from_test.__self__ is Stream + assert Stream.from_test.__name__ == "modified_name" + + +def test_register_plugin_entry_point_raises(): + class invalid_node: + pass + + entry_point = MockEntryPoint("test", invalid_node, "test_module.test") + + Stream.register_plugin_entry_point(entry_point) + + with pytest.raises(TypeError): + Stream.test() From e4975ad269d120948151df4f537d275604837088 Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Wed, 11 Nov 2020 17:07:14 +0300 Subject: [PATCH 04/10] travis_wait for conda create --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index d2eadad0..3eb7a38f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,7 +21,7 @@ install: - conda update conda # Install dependencies - - conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml + - travis_wait 30 conda env create --name test-streamz --file ./conda/environments/streamz_dev.yml - source activate test-streamz - python setup.py install From 01492370d4375c3b6f634e88704dc3c5c8c20683 Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Wed, 11 Nov 2020 18:07:19 +0300 Subject: [PATCH 05/10] fix whitespace issues --- streamz/dataframe/tests/test_dataframes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streamz/dataframe/tests/test_dataframes.py b/streamz/dataframe/tests/test_dataframes.py index 67a04ec5..7de6f168 100644 --- a/streamz/dataframe/tests/test_dataframes.py +++ b/streamz/dataframe/tests/test_dataframes.py @@ -1008,9 +1008,9 @@ def test_windowed_groupby_aggs_with_start_state(stream): out_df1 = pd.DataFrame({'name':['Alice', 'Bob', 'Linda', 'Tom'], 'amount':[50.0, 550.0, 100.0, 150.0]}) assert_eq(output1[-1][1].reset_index(), out_df1) - + def test_dir(stream): example = pd.DataFrame({'name': [], 'amount': []}) sdf = DataFrame(stream, example=example) assert 'name' in dir(sdf) - assert 'amount' in dir(sdf) + assert 'amount' in dir(sdf) From eb93e56d1e2876c8e9ded207f7d156068d8c2505 Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Thu, 12 Nov 2020 00:06:59 +0300 Subject: [PATCH 06/10] fix error msg, add test --- streamz/core.py | 2 +- streamz/tests/test_plugins.py | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/streamz/core.py b/streamz/core.py index a02a4128..130dc5aa 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -312,7 +312,7 @@ def stub(*args, **kwargs): raise TypeError( f"Error loading {entry_point.name} " f"from module {entry_point.module_name}: " - f"{entry_point.cls.__name__} must be a subclass of Stream" + f"{node.__class__.__name__} must be a subclass of Stream" ) cls.register_api( modifier=modifier, attribute_name=entry_point.name diff --git a/streamz/tests/test_plugins.py b/streamz/tests/test_plugins.py index 116f440d..33cd9b27 100644 --- a/streamz/tests/test_plugins.py +++ b/streamz/tests/test_plugins.py @@ -53,3 +53,17 @@ class invalid_node: with pytest.raises(TypeError): Stream.test() + + +def test_register_plugin_entry_point_already_registered(): + @Stream.register_api() + class test(Stream): + pass + + entry_point = MockEntryPoint("test_double", test, "test_module") + + Stream.register_plugin_entry_point(entry_point) + + assert Stream.test_double.__name__ == "stub" + Stream.test_double() + assert Stream.test_double.__name__ == "test" From 319b9e2341306db28da0561d1b1f15315b8ff816 Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Thu, 12 Nov 2020 03:51:35 +0300 Subject: [PATCH 07/10] add setuptools to requirements --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 41c65ec7..485632c5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ tornado toolz zict six +setuptools \ No newline at end of file From 6c7f1f224b763dd54abd16ffb92c467f85e91d18 Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Thu, 12 Nov 2020 17:36:07 +0300 Subject: [PATCH 08/10] docs --- docs/source/index.rst | 1 + docs/source/plugins.rst | 102 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 docs/source/plugins.rst diff --git a/docs/source/index.rst b/docs/source/index.rst index 0638a9ec..8419064a 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -122,3 +122,4 @@ data streaming systems like `Apache Flink `_, collections-api.rst async.rst plotting.rst + plugins.rst diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst new file mode 100644 index 00000000..9ae4e934 --- /dev/null +++ b/docs/source/plugins.rst @@ -0,0 +1,102 @@ +Plugins +======= + +In addition to using ``@Stream.register_api()`` decorator, custom stream nodes can +be added to Streamz by installing 3rd-party Python packages. + + +Known plugins +------------- + +Extras +++++++ + +These plugins are supported by the Streamz community and can be installed as extras, +e.g. ``pip install streamz[kafka]``. + +There are no plugins here yet, but hopefully soon there will be. + +.. only:: comment + ================= ====================================================== + Extra name Description + ================= ====================================================== + ``files`` Advanced filesystem operations: listening for new + files in a directory, writing to multiple files etc. + ``kafka`` Reading from and writing to Kafka topics. + ================= ====================================================== + + +Entry points +------------ + +Plugins register themselves with Streamz by using ``entry_points`` argument +in ``setup.py``: + +.. code-block:: Python + + # setup.py + + from setuptools import setup + + setup( + name="streamz_example_plugin", + version="0.0.1", + entry_points={ + "streamz.nodes": [ + "repeat = streamz_example_plugin:RepeatNode" + ] + } + ) + +In this example, ``RepeatNode`` class will be imported from +``streamz_example_plugin`` package and will be available as ``Stream.repeat``. +In practice, class name and entry point name (the part before ``=`` in entry point +definition) are usually the same, but they `can` be different. + +Different kinds of add-ons go into different entry point groups: + +=========== ======================= ===================== + Node type Required parent class Entry point group +=========== ======================= ===================== + Source ``streamz.Source`` ``streamz.sources`` + Node ``streamz.Stream`` ``streamz.nodes`` + Sink ``streamz.Stream`` ``streamz.sinks`` +=========== ======================= ===================== + + +Lazy loading +++++++++++++ + +Streamz will attach methods from existing plugins to the ``Stream`` class when it's +imported, but actual classes will be loaded only when the corresponding ``Stream`` +method is first called. Streamz will also validate the loaded class before attaching it +and will raise an appropriate exception if validation fails. + + +Reference implementation +------------------------ + +Let's look at how stream nodes can be implemented. + +.. code-block:: Python + + # __init__.py + + from tornado import gen + from streamz import Stream + + + class RepeatNode(Stream): + + def __init__(self, upstream, n, **kwargs): + super().__init__(upstream, ensure_io_loop=True, **kwargs) + self._n = n + + @gen.coroutine + def update(self, x, who=None, metadata=None): + for _ in range(self._n): + yield self._emit(x, metadata=metadata) + +As you can see, implementation is the same as usual, but there's no +``@Stream.register_api()`` — Streamz will take care of that when loading the plugin. +It will still work if you add the decorator, but you don't have to. From 6e39c657daccff5ffc373c4c4eb301cc10edb84e Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Thu, 12 Nov 2020 20:49:09 +0300 Subject: [PATCH 09/10] docstring for stub, register_plugin_entry_point not calling register_api twice --- streamz/core.py | 8 +++++--- streamz/tests/test_plugins.py | 24 ++++-------------------- 2 files changed, 9 insertions(+), 23 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index 130dc5aa..6d6d9b5c 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -307,6 +307,7 @@ def wrapped(*args, **kwargs): @classmethod def register_plugin_entry_point(cls, entry_point, modifier=identity): def stub(*args, **kwargs): + """ Entrypoints-based streamz plugin. Will be loaded on first call. """ node = entry_point.load() if not issubclass(node, Stream): raise TypeError( @@ -314,9 +315,10 @@ def stub(*args, **kwargs): f"from module {entry_point.module_name}: " f"{node.__class__.__name__} must be a subclass of Stream" ) - cls.register_api( - modifier=modifier, attribute_name=entry_point.name - )(node) + if getattr(cls, entry_point.name).__name__ == "stub": + cls.register_api( + modifier=modifier, attribute_name=entry_point.name + )(node) return node(*args, **kwargs) cls.register_api(modifier=modifier, attribute_name=entry_point.name)(stub) diff --git a/streamz/tests/test_plugins.py b/streamz/tests/test_plugins.py index 33cd9b27..a9addca5 100644 --- a/streamz/tests/test_plugins.py +++ b/streamz/tests/test_plugins.py @@ -1,3 +1,5 @@ +import inspect + import pytest from streamz import Source, Stream @@ -31,16 +33,12 @@ def test_register_plugin_entry_point_modifier(): class test_source(Source): pass - def modifier(fn): - fn.__name__ = 'modified_name' - return staticmethod(fn) - entry_point = MockEntryPoint("from_test", test_source) - Stream.register_plugin_entry_point(entry_point, modifier) + Stream.register_plugin_entry_point(entry_point, staticmethod) Stream.from_test() - assert Stream.from_test.__name__ == "modified_name" + assert inspect.isfunction(Stream().from_test) def test_register_plugin_entry_point_raises(): @@ -53,17 +51,3 @@ class invalid_node: with pytest.raises(TypeError): Stream.test() - - -def test_register_plugin_entry_point_already_registered(): - @Stream.register_api() - class test(Stream): - pass - - entry_point = MockEntryPoint("test_double", test, "test_module") - - Stream.register_plugin_entry_point(entry_point) - - assert Stream.test_double.__name__ == "stub" - Stream.test_double() - assert Stream.test_double.__name__ == "test" From fe0f9d9fa4a43d1f6f1663618f3c54375df3fcd9 Mon Sep 17 00:00:00 2001 From: Mikhail Akimov Date: Thu, 12 Nov 2020 22:57:34 +0300 Subject: [PATCH 10/10] avoid plugins overriding existing methods --- streamz/core.py | 6 ++++++ streamz/plugins.py | 18 +++++++++++++++--- streamz/tests/test_plugins.py | 9 ++++++++- 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index 6d6d9b5c..f2a3c2be 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -306,6 +306,12 @@ def wrapped(*args, **kwargs): @classmethod def register_plugin_entry_point(cls, entry_point, modifier=identity): + if hasattr(cls, entry_point.name): + raise ValueError( + f"Can't add {entry_point.name} from {entry_point.module_name} " + f"to {cls.__name__}: duplicate method name." + ) + def stub(*args, **kwargs): """ Entrypoints-based streamz plugin. Will be loaded on first call. """ node = entry_point.load() diff --git a/streamz/plugins.py b/streamz/plugins.py index 313ee587..59b64852 100644 --- a/streamz/plugins.py +++ b/streamz/plugins.py @@ -1,10 +1,22 @@ +import warnings + import pkg_resources +def try_register(cls, entry_point, *modifier): + try: + cls.register_plugin_entry_point(entry_point, *modifier) + except ValueError: + warnings.warn( + f"Can't add {entry_point.name} from {entry_point.module_name}: " + "name collision with existing stream node." + ) + + def load_plugins(cls): for entry_point in pkg_resources.iter_entry_points("streamz.sources"): - cls.register_plugin_entry_point(entry_point, staticmethod) + try_register(cls, entry_point, staticmethod) for entry_point in pkg_resources.iter_entry_points("streamz.nodes"): - cls.register_plugin_entry_point(entry_point) + try_register(cls, entry_point) for entry_point in pkg_resources.iter_entry_points("streamz.sinks"): - cls.register_plugin_entry_point(entry_point) + try_register(cls, entry_point) diff --git a/streamz/tests/test_plugins.py b/streamz/tests/test_plugins.py index a9addca5..0c4999a4 100644 --- a/streamz/tests/test_plugins.py +++ b/streamz/tests/test_plugins.py @@ -41,7 +41,7 @@ class test_source(Source): assert inspect.isfunction(Stream().from_test) -def test_register_plugin_entry_point_raises(): +def test_register_plugin_entry_point_raises_type(): class invalid_node: pass @@ -51,3 +51,10 @@ class invalid_node: with pytest.raises(TypeError): Stream.test() + + +def test_register_plugin_entry_point_raises_duplicate_name(): + entry_point = MockEntryPoint("map", None) + + with pytest.raises(ValueError): + Stream.register_plugin_entry_point(entry_point)