diff --git a/c2cwsgiutils/_broadcast/__init__.py b/c2cwsgiutils/broadcast/__init__.py similarity index 70% rename from c2cwsgiutils/_broadcast/__init__.py rename to c2cwsgiutils/broadcast/__init__.py index 5bbbdf62e..9f0136f3d 100644 --- a/c2cwsgiutils/_broadcast/__init__.py +++ b/c2cwsgiutils/broadcast/__init__.py @@ -1,13 +1,14 @@ """ Broadcast messages to all the processes of Gunicorn in every containers. """ +import functools import logging import pyramid.config -from typing import Optional, Callable +from typing import Optional, Callable, Any from c2cwsgiutils import _utils -from c2cwsgiutils._broadcast import redis, local -from c2cwsgiutils._broadcast import interface # noqa # pylint: disable=unused-import +from c2cwsgiutils.broadcast import redis, local +from c2cwsgiutils.broadcast import interface # noqa # pylint: disable=unused-import LOG = logging.getLogger(__name__) REDIS_ENV_KEY = "C2C_REDIS_URL" @@ -68,3 +69,25 @@ def broadcast(channel: str, params: Optional[dict]=None, expect_answers: bool=Fa global _broadcaster assert _broadcaster is not None return _broadcaster.broadcast(channel, params if params is not None else {}, expect_answers, timeout) + + +def decorator(channel: Optional[str]=None, expect_answers: bool=False, timeout: float=10) -> Callable: + """ + The decorated function will be called through the broadcast functionality. If expect_answers is set to + True, the returned value will be a list of all the answers. + + This works only if the module using this decorator is imported after the broadcast system is setup. + """ + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + def wrapper(**kwargs: Any) -> Any: + return broadcast(_channel, params=kwargs, expect_answers=expect_answers, timeout=timeout) + + if channel is None: + _channel = 'c2c_decorated_' + str(id(func)) + else: + _channel = channel + subscribe(_channel, func) + + return wrapper + return decorator diff --git a/c2cwsgiutils/_broadcast/interface.py b/c2cwsgiutils/broadcast/interface.py similarity index 100% rename from c2cwsgiutils/_broadcast/interface.py rename to c2cwsgiutils/broadcast/interface.py diff --git a/c2cwsgiutils/_broadcast/local.py b/c2cwsgiutils/broadcast/local.py similarity index 94% rename from c2cwsgiutils/_broadcast/local.py rename to c2cwsgiutils/broadcast/local.py index 4b3ce3633..5573ba946 100644 --- a/c2cwsgiutils/_broadcast/local.py +++ b/c2cwsgiutils/broadcast/local.py @@ -1,7 +1,7 @@ from typing import MutableMapping, Callable, Optional # noqa # pylint: disable=unused-import # noinspection PyProtectedMember -from c2cwsgiutils._broadcast import utils, interface +from c2cwsgiutils.broadcast import utils, interface class LocalBroadcaster(interface.BaseBroadcaster): diff --git a/c2cwsgiutils/_broadcast/redis.py b/c2cwsgiutils/broadcast/redis.py similarity index 98% rename from c2cwsgiutils/_broadcast/redis.py rename to c2cwsgiutils/broadcast/redis.py index cf6f56870..14ebf6b6b 100644 --- a/c2cwsgiutils/_broadcast/redis.py +++ b/c2cwsgiutils/broadcast/redis.py @@ -6,7 +6,7 @@ from typing import Callable, Optional, Mapping, Any import time -from c2cwsgiutils._broadcast import utils, interface +from c2cwsgiutils.broadcast import utils, interface LOG = logging.getLogger(__name__) diff --git a/c2cwsgiutils/_broadcast/utils.py b/c2cwsgiutils/broadcast/utils.py similarity index 100% rename from c2cwsgiutils/_broadcast/utils.py rename to c2cwsgiutils/broadcast/utils.py diff --git a/c2cwsgiutils/debug.py b/c2cwsgiutils/debug.py index b98195c8e..f47d5b663 100644 --- a/c2cwsgiutils/debug.py +++ b/c2cwsgiutils/debug.py @@ -11,7 +11,7 @@ from typing import Dict, Mapping, List, Any import sys -from c2cwsgiutils import _utils, _auth, _broadcast +from c2cwsgiutils import _utils, _auth, broadcast CONFIG_KEY = 'c2c.debug_view_secret' ENV_KEY = 'DEBUG_VIEW_SECRET' @@ -21,7 +21,7 @@ def _dump_stacks(request: pyramid.request.Request) -> List[Mapping[str, List[Mapping[str, Any]]]]: _auth.auth_view(request, ENV_KEY, CONFIG_KEY) - result = _broadcast.broadcast('c2c_dump_stacks', expect_answers=True) + result = broadcast.broadcast('c2c_dump_stacks', expect_answers=True) assert result is not None return result @@ -47,7 +47,7 @@ def _dump_stacks_impl() -> Dict[str, List[Dict[str, Any]]]: def _dump_memory(request: pyramid.request.Request) -> List[Mapping[str, Any]]: _auth.auth_view(request, ENV_KEY, CONFIG_KEY) limit = int(request.params.get('limit', '30')) - result = _broadcast.broadcast('c2c_dump_memory', params={'limit': limit}, expect_answers=True) + result = broadcast.broadcast('c2c_dump_memory', params={'limit': limit}, expect_answers=True) assert result is not None return result @@ -112,8 +112,8 @@ def _error(request: pyramid.request.Request) -> Any: def init(config: pyramid.config.Configurator) -> None: if _utils.env_or_config(config, ENV_KEY, CONFIG_KEY, False): - _broadcast.subscribe('c2c_dump_memory', _dump_memory_impl) - _broadcast.subscribe('c2c_dump_stacks', _dump_stacks_impl) + broadcast.subscribe('c2c_dump_memory', _dump_memory_impl) + broadcast.subscribe('c2c_dump_stacks', _dump_stacks_impl) config.add_route("c2c_debug_stacks", _utils.get_base_path(config) + r"/debug/stacks", request_method="GET") diff --git a/c2cwsgiutils/logging_view.py b/c2cwsgiutils/logging_view.py index fd8a4f8a9..428d00b85 100644 --- a/c2cwsgiutils/logging_view.py +++ b/c2cwsgiutils/logging_view.py @@ -3,7 +3,7 @@ import pyramid.request from typing import Mapping, Any -from c2cwsgiutils import _utils, _auth, _broadcast +from c2cwsgiutils import _utils, _auth, broadcast LOG = logging.getLogger(__name__) CONFIG_KEY = 'c2c.log_view_secret' @@ -15,7 +15,7 @@ def install_subscriber(config: pyramid.config.Configurator) -> None: Install the view to configure the loggers, if configured to do so. """ if _utils.env_or_config(config, ENV_KEY, CONFIG_KEY, False): - _broadcast.subscribe('c2c_logging_level', lambda name, level: logging.getLogger(name).setLevel(level)) + broadcast.subscribe('c2c_logging_level', lambda name, level: logging.getLogger(name).setLevel(level)) config.add_route("c2c_logging_level", _utils.get_base_path(config) + r"/logging/level", request_method="GET") @@ -32,6 +32,6 @@ def _logging_change_level(request: pyramid.request.Request) -> Mapping[str, Any] if level is not None: LOG.critical("Logging of %s changed from %s to %s", name, logging.getLevelName(logger.level), level) logger.setLevel(level) - _broadcast.broadcast('c2c_logging_level', params={'name': name, 'level': level}) + broadcast.broadcast('c2c_logging_level', params={'name': name, 'level': level}) return {'status': 200, 'name': name, 'level': logging.getLevelName(logger.level), 'effective_level': logging.getLevelName(logger.getEffectiveLevel())} diff --git a/c2cwsgiutils/pyramid.py b/c2cwsgiutils/pyramid.py index 903745438..a55109ea6 100644 --- a/c2cwsgiutils/pyramid.py +++ b/c2cwsgiutils/pyramid.py @@ -3,7 +3,7 @@ import pyramid_tm from c2cwsgiutils import stats_pyramid, logging_view, sql_profiler, version, debug, sentry,\ - request_tracking, errors, pretty_json, _broadcast + request_tracking, errors, pretty_json, broadcast def includeme(config: pyramid.config.Configurator) -> None: @@ -17,7 +17,7 @@ def includeme(config: pyramid.config.Configurator) -> None: config.include(pyramid_tm.includeme) config.include(cornice.includeme) pretty_json.init(config) - _broadcast.init(config) + broadcast.init(config) stats_pyramid.init(config) request_tracking.init(config) logging_view.install_subscriber(config) diff --git a/c2cwsgiutils/sql_profiler.py b/c2cwsgiutils/sql_profiler.py index 809d7472b..63e479623 100644 --- a/c2cwsgiutils/sql_profiler.py +++ b/c2cwsgiutils/sql_profiler.py @@ -11,7 +11,7 @@ from threading import Lock from typing import Any, Mapping -from c2cwsgiutils import _utils, _auth, _broadcast +from c2cwsgiutils import _utils, _auth, broadcast ENV_KEY = 'SQL_PROFILER_SECRET' CONFIG_KEY = 'c2c.sql_profiler_secret' @@ -49,7 +49,7 @@ def _sql_profiler_view(request: pyramid.request.Request) -> Mapping[str, Any]: _auth.auth_view(request, ENV_KEY, CONFIG_KEY) enable = request.params.get('enable') if enable is not None: - _broadcast.broadcast('c2c_sql_profiler', params={'enable': enable}, expect_answers=True) + broadcast.broadcast('c2c_sql_profiler', params={'enable': enable}, expect_answers=True) return {'status': 200, 'enabled': repository is not None} @@ -87,7 +87,7 @@ def init(config: pyramid.config.Configurator) -> None: Install a pyramid event handler that adds the request information """ if _utils.env_or_config(config, ENV_KEY, CONFIG_KEY, False): - _broadcast.subscribe('c2c_sql_profiler', _setup_profiler) + broadcast.subscribe('c2c_sql_profiler', _setup_profiler) config.add_route("c2c_sql_profiler", _utils.get_base_path(config) + r"/sql_profiler", request_method="GET") diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index faa89c3a1..bebadd905 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -1,34 +1,59 @@ -from c2cwsgiutils._broadcast import local -from c2cwsgiutils import _broadcast +import pytest +from c2cwsgiutils.broadcast import local +from c2cwsgiutils import broadcast -def test_local(): - _broadcast._broadcaster = local.LocalBroadcaster() # pylint: disable=W0212 + +@pytest.yield_fixture() +def local_broadcaster(): + broadcast._broadcaster = local.LocalBroadcaster() # pylint: disable=W0212 try: - cb_calls = [0, 0] + yield + finally: + broadcast._broadcaster = None # pylint: disable=W0212 - def cb1(data): - cb_calls[0] += 1 - return data + 1 - def cb2(data): - cb_calls[1] += 1 +def test_local(local_broadcaster): + cb_calls = [0, 0] - assert _broadcast.broadcast("test1", {'data': 1}, expect_answers=True) == [] # pylint: disable=W0212 - assert cb_calls == [0, 0] + def cb1(data): + cb_calls[0] += 1 + return data + 1 - _broadcast.subscribe("test1", cb1) - _broadcast.subscribe("test2", cb2) - assert cb_calls == [0, 0] + def cb2(data): + cb_calls[1] += 1 - assert _broadcast.broadcast("test1", {'data': 1}) is None - assert cb_calls == [1, 0] + assert broadcast.broadcast("test1", {'data': 1}, expect_answers=True) == [] + assert cb_calls == [0, 0] - assert _broadcast.broadcast("test2", {'data': 1}) is None - assert cb_calls == [1, 1] + broadcast.subscribe("test1", cb1) + broadcast.subscribe("test2", cb2) + assert cb_calls == [0, 0] - assert _broadcast.broadcast("test1", {'data': 12}, expect_answers=True) == [13] - assert cb_calls == [2, 1] + assert broadcast.broadcast("test1", {'data': 1}) is None + assert cb_calls == [1, 0] - finally: - _broadcast._broadcaster = None # pylint: disable=W0212 + assert broadcast.broadcast("test2", {'data': 1}) is None + assert cb_calls == [1, 1] + + assert broadcast.broadcast("test1", {'data': 12}, expect_answers=True) == [13] + assert cb_calls == [2, 1] + + +def test_decorator(local_broadcaster): + cb_calls = [0, 0] + + @broadcast.decorator(expect_answers=True) + def cb1(value): + cb_calls[0] += 1 + return value + 1 + + @broadcast.decorator() + def cb2(): + cb_calls[1] += 1 + + assert cb1(value=12) == [13] + assert cb_calls == [1, 0] + + assert cb2() is None + assert cb_calls == [1, 1]