diff --git a/fastapi_websocket_pubsub/event_broadcaster.py b/fastapi_websocket_pubsub/event_broadcaster.py index f9b8cd7..5efec7d 100644 --- a/fastapi_websocket_pubsub/event_broadcaster.py +++ b/fastapi_websocket_pubsub/event_broadcaster.py @@ -8,7 +8,7 @@ from fastapi_websocket_rpc.utils import gen_uid -logger = get_logger('EventBroadcaster') +logger = get_logger("EventBroadcaster") # Cross service broadcast consts @@ -35,9 +35,14 @@ class EventBroadcasterContextManager: Friend-like class of EventBroadcaster (accessing "protected" members ) """ - def __init__(self, event_broadcaster: "EventBroadcaster", listen: bool = True, share: bool = True) -> None: + def __init__( + self, + event_broadcaster: "EventBroadcaster", + listen: bool = True, + share: bool = True, + ) -> None: """ - Provide a context manager for an EventBroadcaster, managing if it listens to events coming from the broadcaster + Provide a context manager for an EventBroadcaster, managing if it listens to events coming from the broadcaster and if it subscribes to the internal notifier to share its events with the broadcaster Args: @@ -48,15 +53,16 @@ def __init__(self, event_broadcaster: "EventBroadcaster", listen: bool = True, s self._event_broadcaster = event_broadcaster self._share: bool = share self._listen: bool = listen - self._lock = asyncio.Lock() async def __aenter__(self): - async with self._lock: + async with self._event_broadcaster._context_manager_lock: if self._listen: self._event_broadcaster._listen_count += 1 if self._event_broadcaster._listen_count == 1: # We have our first listener start the read-task for it (And all those who'd follow) - logger.info("Listening for incoming events from broadcast channel (first listener started)") + logger.info( + "Listening for incoming events from broadcast channel (first listener started)" + ) # Start task listening on incoming broadcasts self._event_broadcaster.start_reader_task() @@ -65,16 +71,19 @@ async def __aenter__(self): if self._event_broadcaster._share_count == 1: # We have our first publisher # Init the broadcast used for sharing (reading has its own) - self._event_broadcaster._acquire_sharing_broadcast_channel() - logger.debug("Subscribing to ALL_TOPICS, and sharing messages with broadcast channel") + logger.debug( + "Subscribing to ALL_TOPICS, and sharing messages with broadcast channel" + ) # Subscribe to internal events form our own event notifier and broadcast them await self._event_broadcaster._subscribe_to_all_topics() else: - logger.debug(f"Did not subscribe to ALL_TOPICS: share count == {self._event_broadcaster._share_count}") + logger.debug( + f"Did not subscribe to ALL_TOPICS: share count == {self._event_broadcaster._share_count}" + ) return self async def __aexit__(self, exc_type, exc, tb): - async with self._lock: + async with self._event_broadcaster._context_manager_lock: try: if self._listen: self._event_broadcaster._listen_count -= 1 @@ -87,7 +96,7 @@ async def __aexit__(self, exc_type, exc, tb): self._event_broadcaster._subscription_task = None if self._share: - self._event_broadcaster._share_count -= 1 + self._event_broadcaster._share_count -= 1 # if this was last sharer - we can stop subscribing to internal events - we aren't sharing anymore if self._event_broadcaster._share_count == 0: # Unsubscribe from internal events @@ -110,8 +119,14 @@ class EventBroadcaster: """ - def __init__(self, broadcast_url: str, notifier: EventNotifier, channel="EventNotifier", - broadcast_type=None, is_publish_only=False) -> None: + def __init__( + self, + broadcast_url: str, + notifier: EventNotifier, + channel="EventNotifier", + broadcast_type=None, + is_publish_only=False, + ) -> None: """ Args: @@ -138,10 +153,10 @@ def __init__(self, broadcast_url: str, notifier: EventNotifier, channel="EventNo self._publish_lock = None # used to track creation / removal of resources needed per type (reader task->listen, and subscription to internal events->share) self._listen_count: int = 0 - self._share_count: int = 0 - # If we opt to manage the context directly (i.e. call async with on the event broadcaster itself) + self._share_count: int = 0 + # If we opt to manage the context directly (i.e. call async with on the event broadcaster itself) self._context_manager = None - + self._context_manager_lock = asyncio.Lock() async def __broadcast_notifications__(self, subscription: Subscription, data): """ @@ -151,25 +166,25 @@ async def __broadcast_notifications__(self, subscription: Subscription, data): subscription (Subscription): the subscription that got triggered data: the event data """ - logger.info("Broadcasting incoming event: {}".format({'topic': subscription.topic, 'notifier_id': self._id})) - note = BroadcastNotification(notifier_id=self._id, topics=[ - subscription.topic], data=data) - # Publish event to broadcast - async with self._publish_lock: - async with self._sharing_broadcast_channel: - await self._sharing_broadcast_channel.publish(self._channel, note.json()) + logger.info( + "Broadcasting incoming event: {}".format( + {"topic": subscription.topic, "notifier_id": self._id} + ) + ) + note = BroadcastNotification( + notifier_id=self._id, topics=[subscription.topic], data=data + ) - def _acquire_sharing_broadcast_channel(self): - """ - Initialize the elements needed for sharing events with the broadcast channel - """ - self._publish_lock = asyncio.Lock() - self._sharing_broadcast_channel = self._broadcast_type(self._broadcast_url) + # Publish event to broadcast + async with self._broadcast_type( + self._broadcast_url + ) as sharing_broadcast_channel: + await sharing_broadcast_channel.publish(self._channel, note.json()) async def _subscribe_to_all_topics(self): - return await self._notifier.subscribe(self._id, - ALL_TOPICS, - self.__broadcast_notifications__) + return await self._notifier.subscribe( + self._id, ALL_TOPICS, self.__broadcast_notifications__ + ) async def _unsubscribe_from_topics(self): return await self._notifier.unsubscribe(self._id) @@ -183,16 +198,16 @@ def get_context(self, listen=True, share=True): share (bool, optional): Should we share events with the broadcast channel. Defaults to True. Returns: - EventBroadcasterContextManager: the context + EventBroadcasterContextManager: the context """ return EventBroadcasterContextManager(self, listen=listen, share=share) def get_listening_context(self): return EventBroadcasterContextManager(self, listen=True, share=False) - + def get_sharing_context(self): return EventBroadcasterContextManager(self, listen=False, share=True) - + async def __aenter__(self): """ Convince caller (also backward compaltability) @@ -201,7 +216,6 @@ async def __aenter__(self): self._context_manager = self.get_context(listen=not self._is_publish_only) return await self._context_manager.__aenter__() - async def __aexit__(self, exc_type, exc, tb): await self._context_manager.__aexit__(exc_type, exc, tb) @@ -215,14 +229,15 @@ def start_reader_task(self): # Make sure a task wasn't started already if self._subscription_task is not None: # we already started a task for this worker process - logger.debug("No need for listen task, already started broadcast listen task for this notifier") + logger.debug( + "No need for listen task, already started broadcast listen task for this notifier" + ) return # Trigger the task logger.debug("Spawning broadcast listen task") - self._subscription_task = asyncio.create_task( - self.__read_notifications__()) + self._subscription_task = asyncio.create_task(self.__read_notifications__()) return self._subscription_task - + def get_reader_task(self): return self._subscription_task @@ -235,15 +250,30 @@ async def __read_notifications__(self): listening_broadcast_channel = self._broadcast_type(self._broadcast_url) async with listening_broadcast_channel: # Subscribe to our channel - async with listening_broadcast_channel.subscribe(channel=self._channel) as subscriber: + async with listening_broadcast_channel.subscribe( + channel=self._channel + ) as subscriber: async for event in subscriber: try: - notification = BroadcastNotification.parse_raw( - event.message) + notification = BroadcastNotification.parse_raw(event.message) # Avoid re-publishing our own broadcasts if notification.notifier_id != self._id: - logger.info("Handling incoming broadcast event: {}".format({'topics': notification.topics, 'src': notification.notifier_id})) + logger.debug( + "Handling incoming broadcast event: {}".format( + { + "topics": notification.topics, + "src": notification.notifier_id, + } + ) + ) # Notify subscribers of message received from broadcast - await self._notifier.notify(notification.topics, notification.data, notifier_id=self._id) + await self._notifier.notify( + notification.topics, + notification.data, + notifier_id=self._id, + ) except: logger.exception("Failed handling incoming broadcast") + logger.info( + "No more events to read from subscriber (underlying connection closed)" + ) diff --git a/fastapi_websocket_pubsub/event_notifier.py b/fastapi_websocket_pubsub/event_notifier.py index 35cad9a..ca0bd61 100644 --- a/fastapi_websocket_pubsub/event_notifier.py +++ b/fastapi_websocket_pubsub/event_notifier.py @@ -129,7 +129,7 @@ async def subscribe( ) subscriptions.append(new_subscription) new_subscriptions.append(new_subscription) - logger.info(f"New subscription {new_subscription.dict()}") + logger.debug(f"New subscription {new_subscription.dict()}") await EventNotifier.trigger_events( self._on_subscribe_events, subscriber_id, topics ) @@ -153,7 +153,7 @@ async def unsubscribe( for topic in topics: subscribers = self._topics[topic] if subscriber_id in subscribers: - logger.info( + logger.debug( f"Removing Subscription of topic='{topic}' for subscriber={subscriber_id}" ) del subscribers[subscriber_id] @@ -208,12 +208,12 @@ async def callback_subscribers( if (subscription.topic == ALL_TOPICS) else subscription.topic ) - logger.info( + logger.debug( f"calling subscription callbacks: topic={topic} ({original_topic}), subscription_id={subscription.id}, subscriber_id={subscriber_id}" ) else: event = subscription - logger.info( + logger.debug( f"calling subscription callbacks: topic={topic}, subscription_id={subscription.id}, subscriber_id={subscriber_id}" ) # call callback with subscription-info and provided data diff --git a/requirements.txt b/requirements.txt index ee1ef27..32d4b7e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ fastapi-websocket-rpc>=0.1.24,<1 -permit-broadcaster[redis,postgres,kafka]==0.2.3 +permit-broadcaster[redis,postgres,kafka]==0.2.4 pydantic>=1.9.1,<2 websockets>=10.3,<11 \ No newline at end of file diff --git a/scripts/publish.sh b/scripts/publish.sh index a2aa56b..ddde291 100755 --- a/scripts/publish.sh +++ b/scripts/publish.sh @@ -1,2 +1,3 @@ -python setup.py sdist bdist_wheel -python -m twine upload dist/* \ No newline at end of file +rm dist/* +python3 setup.py sdist bdist_wheel +python3 -m twine upload --repository testpypi dist/* \ No newline at end of file diff --git a/setup.py b/setup.py index e8d5649..e1358fb 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ def get_requirements(env=""): setup( name="fastapi_websocket_pubsub", - version="0.3.4", + version="0.3.5", author="Or Weis", author_email="or@permit.io", description="A fast and durable PubSub channel over Websockets (using fastapi-websockets-rpc).",