Skip to content

Commit

Permalink
Merge pull request #167 from camptocamp/pre_init_broadcast
Browse files Browse the repository at this point in the history
Allow the broadcast framework to be initialized afterwards
  • Loading branch information
Patrick Valsecchi authored Jun 15, 2018
2 parents 6c0b429 + b50cb97 commit 8ec4285
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ RUN apt-get update && \
apt-get clean && \
rm -r /var/lib/apt/lists/*
COPY requirements.txt docker-requirements.txt /opt/c2cwsgiutils/
RUN pip install --no-cache-dir -r /opt/c2cwsgiutils/requirements.txt -r /opt/c2cwsgiutils/docker-requirements.txt
RUN pip install --no-cache-dir -r /opt/c2cwsgiutils/requirements.txt -r /opt/c2cwsgiutils/docker-requirements.txt

COPY . /opt/c2cwsgiutils/
RUN flake8 /opt/c2cwsgiutils && \
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile.3.5
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ RUN echo "deb http://apt.postgresql.org/pub/repos/apt/ jessie-pgdg main" > /etc/
vim && \
apt-get clean && \
rm -r /var/lib/apt/lists/*
COPY requirements.txt /opt/c2cwsgiutils/
RUN pip install --no-cache-dir -r /opt/c2cwsgiutils/requirements.txt
COPY requirements.txt docker-requirements.txt /opt/c2cwsgiutils/
RUN pip install --no-cache-dir -r /opt/c2cwsgiutils/requirements.txt -r /opt/c2cwsgiutils/docker-requirements.txt

COPY . /opt/c2cwsgiutils/
RUN flake8 /opt/c2cwsgiutils && \
Expand Down
14 changes: 13 additions & 1 deletion acceptance_tests/app/c2cwsgiutils_app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

from c2cwsgiutils import broadcast
import c2cwsgiutils.pyramid
from c2cwsgiutils.health_check import HealthCheck
from pyramid.config import Configurator
Expand All @@ -11,10 +11,22 @@ def _failure(_request):
raise HTTPInternalServerError('failing check')


@broadcast.decorator(expect_answers=True)
def broadcast_view():
return 42


def main(_, **settings):
""" This function returns a Pyramid WSGI application.
"""
config = Configurator(settings=settings, route_prefix='/api')

# Initialise the broadcast view before c2cwsgiutils is initialised. This allows to test the
# reconfiguration on the fly of the broadcast framework
config.add_route("broadcast", r"/broadcast", request_method="GET")
config.add_view(lambda request: broadcast_view(), route_name="broadcast", renderer="fast_json",
http_cache=0)

config.include(c2cwsgiutils.pyramid.includeme)
models.init(config)
config.scan("c2cwsgiutils_app.services")
Expand Down
3 changes: 3 additions & 0 deletions acceptance_tests/tests/tests/test_broadcast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def test_broadcast_reconfig(app_connection):
response = app_connection.get_json("broadcast")
assert response == [42] # only one worker
48 changes: 26 additions & 22 deletions c2cwsgiutils/broadcast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,33 @@

def init(config: pyramid.config.Configurator) -> None:
"""
Initialize the broacaster with Redis, if configured. Otherwise, fall back to a fake local implementation.
Initialize the broadcaster with Redis, if configured. Otherwise, fall back to a fake local implementation.
"""
global _broadcaster
redis_url = _utils.env_or_config(config, REDIS_ENV_KEY, REDIS_CONFIG_KEY, None)
broadcast_prefix = _utils.env_or_config(config, BROADCAST_ENV_KEY, BROADCAST_CONFIG_KEY,
"broadcast_api_")
if _broadcaster is None:
redis_url = _utils.env_or_config(config, REDIS_ENV_KEY, REDIS_CONFIG_KEY, None)
if redis_url is not None:
broadcast_prefix = _utils.env_or_config(config, BROADCAST_ENV_KEY, BROADCAST_CONFIG_KEY,
"broadcast_api_")
try:
_broadcaster = redis.RedisBroadcaster(redis_url, broadcast_prefix)
LOG.info("Broadcast service setup using redis: %s", redis_url)
return
except ImportError: # pragma: no cover
LOG.warning("Cannot import redis for setting up broadcast capabilities")
_broadcaster = redis.RedisBroadcaster(redis_url, broadcast_prefix)
LOG.info("Broadcast service setup using redis: %s", redis_url)
else:
_broadcaster = local.LocalBroadcaster()
LOG.info("Broadcast service setup using local implementation")
elif isinstance(_broadcaster, local.LocalBroadcaster) and redis_url is not None:
LOG.info("Switching from a local broadcaster to a redis broadcaster")
prev_broadcaster = _broadcaster
_broadcaster = redis.RedisBroadcaster(redis_url, broadcast_prefix)
_broadcaster.copy_local_subscriptions(prev_broadcaster)


def _get(need_init: bool=False) -> interface.BaseBroadcaster:
global _broadcaster
if _broadcaster is None:
if need_init:
LOG.error("Broadcast functionality used before it is setup")
_broadcaster = local.LocalBroadcaster()
LOG.info("Broadcast service setup using local implementation")
return _broadcaster


def subscribe(channel: str, callback: Callable) -> None:
Expand All @@ -46,18 +57,14 @@ def subscribe(channel: str, callback: Callable) -> None:
A channel can be subscribed only once.
"""
global _broadcaster
assert _broadcaster is not None
_broadcaster.subscribe(channel, callback)
_get().subscribe(channel, callback)


def unsubscribe(channel: str) -> None:
"""
Unsubscribe from a channel.
"""
global _broadcaster
assert _broadcaster is not None
_broadcaster.unsubscribe(channel)
_get().unsubscribe(channel)


def broadcast(channel: str, params: Optional[dict]=None, expect_answers: bool=False,
Expand All @@ -66,17 +73,14 @@ def broadcast(channel: str, params: Optional[dict]=None, expect_answers: bool=Fa
Broadcast a message to the given channel. If answers are expected, it will wait up to "timeout" seconds
to get all the answers.
"""
global _broadcaster
assert _broadcaster is not None
return _broadcaster.broadcast(channel, params if params is not None else {}, expect_answers, timeout)
return _get(need_init=True).broadcast(channel, params if params is not None else {},
expect_answers, timeout)


def decorator(channel: Optional[str]=None, expect_answers: bool=False, timeout: float=10) -> Callable:
"""
The decorated function will be called through the broadcast functionality. If expect_answers is set to
True, the returned value will be a list of all the answers.
This works only if the module using this decorator is imported after the broadcast system is setup.
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
Expand Down
3 changes: 3 additions & 0 deletions c2cwsgiutils/broadcast/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ def broadcast(self, channel: str, params: Mapping[str, Any], expect_answers: boo
subscriber = self._subscribers.get(channel, None)
answers = [utils.add_host_info(subscriber(**params))] if subscriber is not None else []
return answers if expect_answers else None

def get_subscribers(self) -> Mapping[str, Callable]:
return self._subscribers
8 changes: 7 additions & 1 deletion c2cwsgiutils/broadcast/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Callable, Optional, Mapping, Any # noqa # pylint: disable=unused-import
import time

from c2cwsgiutils.broadcast import utils, interface
from c2cwsgiutils.broadcast import utils, interface, local

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -43,9 +43,11 @@ def wrapper(message: Mapping[str, Any]) -> None:
LOG.debug("Sending broadcast answer on %s", answer_channel)
self._connection.publish(answer_channel, json.dumps(utils.add_host_info(response)))

LOG.debug("Subscribing %s.%s to %s", callback.__module__, callback.__name__, channel)
self._pub_sub.subscribe(**{self._get_channel(channel): wrapper})

def unsubscribe(self, channel: str) -> None:
LOG.debug("Unsubscribing from %s")
self._pub_sub.unsubscribe(self._get_channel(channel))

def broadcast(self, channel: str, params: Mapping[str, Any], expect_answers: bool,
Expand Down Expand Up @@ -100,3 +102,7 @@ def _broadcast(self, channel: str, message: Mapping[str, Any]) -> int:
nb_received = self._connection.publish(actual_channel, json.dumps(message))
LOG.debug('Broadcast on %s sent to %d listeners', actual_channel, nb_received)
return nb_received

def copy_local_subscriptions(self, prev_broadcaster: local.LocalBroadcaster) -> None:
for channel, callback in prev_broadcaster.get_subscribers().items():
self.subscribe(channel, callback)
16 changes: 16 additions & 0 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,19 @@ def cb2():

assert cb2() is None
assert cb_calls == [1, 1]


def test_fallback():
cb_calls = [0]

def cb1(value):
cb_calls[0] += 1
return value + 1

try:
broadcast.subscribe("test1", cb1)

assert broadcast.broadcast("test1", {'value': 12}, expect_answers=True) == [13]
assert cb_calls == [1]
finally:
broadcast._broadcaster = None # pylint: disable=W0212

0 comments on commit 8ec4285

Please sign in to comment.