Skip to content

Commit

Permalink
Merge pull request #160 from camptocamp/public_broadcast
Browse files Browse the repository at this point in the history
Made the broadcast API public
  • Loading branch information
Patrick Valsecchi authored Jun 11, 2018
2 parents 639be38 + 092ba11 commit 7323d62
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""
Broadcast messages to all the processes of Gunicorn in every containers.
"""
import functools
import logging
import pyramid.config
from typing import Optional, Callable
from typing import Optional, Callable, Any

from c2cwsgiutils import _utils
from c2cwsgiutils._broadcast import redis, local
from c2cwsgiutils._broadcast import interface # noqa # pylint: disable=unused-import
from c2cwsgiutils.broadcast import redis, local
from c2cwsgiutils.broadcast import interface # noqa # pylint: disable=unused-import

LOG = logging.getLogger(__name__)
REDIS_ENV_KEY = "C2C_REDIS_URL"
Expand Down Expand Up @@ -68,3 +69,25 @@ def broadcast(channel: str, params: Optional[dict]=None, expect_answers: bool=Fa
global _broadcaster
assert _broadcaster is not None
return _broadcaster.broadcast(channel, params if params is not None else {}, expect_answers, timeout)


def decorator(channel: Optional[str]=None, expect_answers: bool=False, timeout: float=10) -> Callable:
"""
The decorated function will be called through the broadcast functionality. If expect_answers is set to
True, the returned value will be a list of all the answers.
This works only if the module using this decorator is imported after the broadcast system is setup.
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(**kwargs: Any) -> Any:
return broadcast(_channel, params=kwargs, expect_answers=expect_answers, timeout=timeout)

if channel is None:
_channel = 'c2c_decorated_' + str(id(func))
else:
_channel = channel
subscribe(_channel, func)

return wrapper
return decorator
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import MutableMapping, Callable, Optional # noqa # pylint: disable=unused-import

# noinspection PyProtectedMember
from c2cwsgiutils._broadcast import utils, interface
from c2cwsgiutils.broadcast import utils, interface


class LocalBroadcaster(interface.BaseBroadcaster):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Callable, Optional, Mapping, Any
import time

from c2cwsgiutils._broadcast import utils, interface
from c2cwsgiutils.broadcast import utils, interface

LOG = logging.getLogger(__name__)

Expand Down
File renamed without changes.
10 changes: 5 additions & 5 deletions c2cwsgiutils/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from typing import Dict, Mapping, List, Any
import sys

from c2cwsgiutils import _utils, _auth, _broadcast
from c2cwsgiutils import _utils, _auth, broadcast

CONFIG_KEY = 'c2c.debug_view_secret'
ENV_KEY = 'DEBUG_VIEW_SECRET'
Expand All @@ -21,7 +21,7 @@

def _dump_stacks(request: pyramid.request.Request) -> List[Mapping[str, List[Mapping[str, Any]]]]:
_auth.auth_view(request, ENV_KEY, CONFIG_KEY)
result = _broadcast.broadcast('c2c_dump_stacks', expect_answers=True)
result = broadcast.broadcast('c2c_dump_stacks', expect_answers=True)
assert result is not None
return result

Expand All @@ -47,7 +47,7 @@ def _dump_stacks_impl() -> Dict[str, List[Dict[str, Any]]]:
def _dump_memory(request: pyramid.request.Request) -> List[Mapping[str, Any]]:
_auth.auth_view(request, ENV_KEY, CONFIG_KEY)
limit = int(request.params.get('limit', '30'))
result = _broadcast.broadcast('c2c_dump_memory', params={'limit': limit}, expect_answers=True)
result = broadcast.broadcast('c2c_dump_memory', params={'limit': limit}, expect_answers=True)
assert result is not None
return result

Expand Down Expand Up @@ -112,8 +112,8 @@ def _error(request: pyramid.request.Request) -> Any:

def init(config: pyramid.config.Configurator) -> None:
if _utils.env_or_config(config, ENV_KEY, CONFIG_KEY, False):
_broadcast.subscribe('c2c_dump_memory', _dump_memory_impl)
_broadcast.subscribe('c2c_dump_stacks', _dump_stacks_impl)
broadcast.subscribe('c2c_dump_memory', _dump_memory_impl)
broadcast.subscribe('c2c_dump_stacks', _dump_stacks_impl)

config.add_route("c2c_debug_stacks", _utils.get_base_path(config) + r"/debug/stacks",
request_method="GET")
Expand Down
6 changes: 3 additions & 3 deletions c2cwsgiutils/logging_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pyramid.request
from typing import Mapping, Any

from c2cwsgiutils import _utils, _auth, _broadcast
from c2cwsgiutils import _utils, _auth, broadcast

LOG = logging.getLogger(__name__)
CONFIG_KEY = 'c2c.log_view_secret'
Expand All @@ -15,7 +15,7 @@ def install_subscriber(config: pyramid.config.Configurator) -> None:
Install the view to configure the loggers, if configured to do so.
"""
if _utils.env_or_config(config, ENV_KEY, CONFIG_KEY, False):
_broadcast.subscribe('c2c_logging_level', lambda name, level: logging.getLogger(name).setLevel(level))
broadcast.subscribe('c2c_logging_level', lambda name, level: logging.getLogger(name).setLevel(level))

config.add_route("c2c_logging_level", _utils.get_base_path(config) + r"/logging/level",
request_method="GET")
Expand All @@ -32,6 +32,6 @@ def _logging_change_level(request: pyramid.request.Request) -> Mapping[str, Any]
if level is not None:
LOG.critical("Logging of %s changed from %s to %s", name, logging.getLevelName(logger.level), level)
logger.setLevel(level)
_broadcast.broadcast('c2c_logging_level', params={'name': name, 'level': level})
broadcast.broadcast('c2c_logging_level', params={'name': name, 'level': level})
return {'status': 200, 'name': name, 'level': logging.getLevelName(logger.level),
'effective_level': logging.getLevelName(logger.getEffectiveLevel())}
4 changes: 2 additions & 2 deletions c2cwsgiutils/pyramid.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pyramid_tm

from c2cwsgiutils import stats_pyramid, logging_view, sql_profiler, version, debug, sentry,\
request_tracking, errors, pretty_json, _broadcast
request_tracking, errors, pretty_json, broadcast


def includeme(config: pyramid.config.Configurator) -> None:
Expand All @@ -17,7 +17,7 @@ def includeme(config: pyramid.config.Configurator) -> None:
config.include(pyramid_tm.includeme)
config.include(cornice.includeme)
pretty_json.init(config)
_broadcast.init(config)
broadcast.init(config)
stats_pyramid.init(config)
request_tracking.init(config)
logging_view.install_subscriber(config)
Expand Down
6 changes: 3 additions & 3 deletions c2cwsgiutils/sql_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from threading import Lock
from typing import Any, Mapping

from c2cwsgiutils import _utils, _auth, _broadcast
from c2cwsgiutils import _utils, _auth, broadcast

ENV_KEY = 'SQL_PROFILER_SECRET'
CONFIG_KEY = 'c2c.sql_profiler_secret'
Expand Down Expand Up @@ -49,7 +49,7 @@ def _sql_profiler_view(request: pyramid.request.Request) -> Mapping[str, Any]:
_auth.auth_view(request, ENV_KEY, CONFIG_KEY)
enable = request.params.get('enable')
if enable is not None:
_broadcast.broadcast('c2c_sql_profiler', params={'enable': enable}, expect_answers=True)
broadcast.broadcast('c2c_sql_profiler', params={'enable': enable}, expect_answers=True)
return {'status': 200, 'enabled': repository is not None}


Expand Down Expand Up @@ -87,7 +87,7 @@ def init(config: pyramid.config.Configurator) -> None:
Install a pyramid event handler that adds the request information
"""
if _utils.env_or_config(config, ENV_KEY, CONFIG_KEY, False):
_broadcast.subscribe('c2c_sql_profiler', _setup_profiler)
broadcast.subscribe('c2c_sql_profiler', _setup_profiler)

config.add_route("c2c_sql_profiler", _utils.get_base_path(config) + r"/sql_profiler",
request_method="GET")
Expand Down
71 changes: 48 additions & 23 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,59 @@
from c2cwsgiutils._broadcast import local
from c2cwsgiutils import _broadcast
import pytest

from c2cwsgiutils.broadcast import local
from c2cwsgiutils import broadcast

def test_local():
_broadcast._broadcaster = local.LocalBroadcaster() # pylint: disable=W0212

@pytest.yield_fixture()
def local_broadcaster():
broadcast._broadcaster = local.LocalBroadcaster() # pylint: disable=W0212
try:
cb_calls = [0, 0]
yield
finally:
broadcast._broadcaster = None # pylint: disable=W0212

def cb1(data):
cb_calls[0] += 1
return data + 1

def cb2(data):
cb_calls[1] += 1
def test_local(local_broadcaster):
cb_calls = [0, 0]

assert _broadcast.broadcast("test1", {'data': 1}, expect_answers=True) == [] # pylint: disable=W0212
assert cb_calls == [0, 0]
def cb1(data):
cb_calls[0] += 1
return data + 1

_broadcast.subscribe("test1", cb1)
_broadcast.subscribe("test2", cb2)
assert cb_calls == [0, 0]
def cb2(data):
cb_calls[1] += 1

assert _broadcast.broadcast("test1", {'data': 1}) is None
assert cb_calls == [1, 0]
assert broadcast.broadcast("test1", {'data': 1}, expect_answers=True) == []
assert cb_calls == [0, 0]

assert _broadcast.broadcast("test2", {'data': 1}) is None
assert cb_calls == [1, 1]
broadcast.subscribe("test1", cb1)
broadcast.subscribe("test2", cb2)
assert cb_calls == [0, 0]

assert _broadcast.broadcast("test1", {'data': 12}, expect_answers=True) == [13]
assert cb_calls == [2, 1]
assert broadcast.broadcast("test1", {'data': 1}) is None
assert cb_calls == [1, 0]

finally:
_broadcast._broadcaster = None # pylint: disable=W0212
assert broadcast.broadcast("test2", {'data': 1}) is None
assert cb_calls == [1, 1]

assert broadcast.broadcast("test1", {'data': 12}, expect_answers=True) == [13]
assert cb_calls == [2, 1]


def test_decorator(local_broadcaster):
cb_calls = [0, 0]

@broadcast.decorator(expect_answers=True)
def cb1(value):
cb_calls[0] += 1
return value + 1

@broadcast.decorator()
def cb2():
cb_calls[1] += 1

assert cb1(value=12) == [13]
assert cb_calls == [1, 0]

assert cb2() is None
assert cb_calls == [1, 1]

0 comments on commit 7323d62

Please sign in to comment.