From ea16b34990290c976278c534ccba462898235481 Mon Sep 17 00:00:00 2001 From: Ro'e Katz Date: Tue, 20 Jun 2023 12:44:43 +0300 Subject: [PATCH] PubSubEndpoint: Reuse EventBroadcaster's sharing context To avoid some of the reconnections --- fastapi_websocket_pubsub/pub_sub_server.py | 34 +++++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/fastapi_websocket_pubsub/pub_sub_server.py b/fastapi_websocket_pubsub/pub_sub_server.py index 83f3913..0f04aa1 100644 --- a/fastapi_websocket_pubsub/pub_sub_server.py +++ b/fastapi_websocket_pubsub/pub_sub_server.py @@ -34,7 +34,7 @@ def __init__( on_connect: List[Coroutine] = None, on_disconnect: List[Coroutine] = None, rpc_channel_get_remote_id: bool = False, - ignore_broadcaster_disconnected = True, + ignore_broadcaster_disconnected=True, ): """ The PubSub endpoint recives subscriptions from clients and publishes data back to them upon receiving relevant publications. @@ -85,6 +85,7 @@ def __init__( # Separate if for the server to subscribe to its own events self._subscriber_id: str = self.notifier.gen_subscriber_id() self._ignore_broadcaster_disconnected = ignore_broadcaster_disconnected + self._broadcaster_sharing_context = None async def subscribe( self, topics: Union[TopicList, ALL_TOPICS], callback: EventCallback @@ -103,9 +104,21 @@ async def publish(self, topics: Union[TopicList, Topic], data=None): # sharing here means - the broadcaster listens in to the notifier as well logger.debug(f"Publishing message to topics: {topics}") if self.broadcaster is not None: + if self._broadcaster_sharing_context is None: + logger.debug(f"Getting new broadcaster sharing context") + self._broadcaster_sharing_context = self.broadcaster.get_context( + listen=True, share=True + ) logger.debug(f"Acquiring broadcaster sharing context") - async with self.broadcaster.get_context(listen=False, share=True): - await self.notifier.notify(topics, data, notifier_id=self._id) + try: + async with self._broadcaster_sharing_context: + await self.notifier.notify(topics, data, notifier_id=self._id) + except Exception: + # Could check if the exception has to do with disconnection, but just in case better to restart sharing context anyway + logger.warning(f"Exception in publish, resetting sharing context") + self._broadcaster_sharing_context = None + raise + # otherwise just notify else: await self.notifier.notify(topics, data, notifier_id=self._id) @@ -132,14 +145,19 @@ async def main_loop(self, websocket: WebSocket, client_id: str = None, **kwargs) async with self.broadcaster: logger.debug("Entering endpoint's main loop with broadcaster") if self._ignore_broadcaster_disconnected: - await self.endpoint.main_loop(websocket, client_id=client_id, **kwargs) + await self.endpoint.main_loop( + websocket, client_id=client_id, **kwargs + ) else: main_loop_task = asyncio.create_task( - self.endpoint.main_loop(websocket, client_id=client_id, **kwargs) + self.endpoint.main_loop( + websocket, client_id=client_id, **kwargs + ) + ) + done, pending = await asyncio.wait( + [main_loop_task, self.broadcaster.get_reader_task()], + return_when=asyncio.FIRST_COMPLETED, ) - done, pending = await asyncio.wait([main_loop_task, - self.broadcaster.get_reader_task()], - return_when=asyncio.FIRST_COMPLETED) logger.debug(f"task is done: {done}") # broadcaster's reader task is used by other endpoints and shouldn't be cancelled if main_loop_task in pending: