Skip to content

Commit

Permalink
PubSubEndpoint: Reuse EventBroadcaster's sharing context
Browse files Browse the repository at this point in the history
To avoid some of the reconnections
  • Loading branch information
roekatz committed Jun 20, 2023
1 parent 36b9dc1 commit ea16b34
Showing 1 changed file with 26 additions and 8 deletions.
34 changes: 26 additions & 8 deletions fastapi_websocket_pubsub/pub_sub_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(
on_connect: List[Coroutine] = None,
on_disconnect: List[Coroutine] = None,
rpc_channel_get_remote_id: bool = False,
ignore_broadcaster_disconnected = True,
ignore_broadcaster_disconnected=True,
):
"""
The PubSub endpoint recives subscriptions from clients and publishes data back to them upon receiving relevant publications.
Expand Down Expand Up @@ -85,6 +85,7 @@ def __init__(
# Separate if for the server to subscribe to its own events
self._subscriber_id: str = self.notifier.gen_subscriber_id()
self._ignore_broadcaster_disconnected = ignore_broadcaster_disconnected
self._broadcaster_sharing_context = None

async def subscribe(
self, topics: Union[TopicList, ALL_TOPICS], callback: EventCallback
Expand All @@ -103,9 +104,21 @@ async def publish(self, topics: Union[TopicList, Topic], data=None):
# sharing here means - the broadcaster listens in to the notifier as well
logger.debug(f"Publishing message to topics: {topics}")
if self.broadcaster is not None:
if self._broadcaster_sharing_context is None:
logger.debug(f"Getting new broadcaster sharing context")
self._broadcaster_sharing_context = self.broadcaster.get_context(
listen=True, share=True
)
logger.debug(f"Acquiring broadcaster sharing context")
async with self.broadcaster.get_context(listen=False, share=True):
await self.notifier.notify(topics, data, notifier_id=self._id)
try:
async with self._broadcaster_sharing_context:
await self.notifier.notify(topics, data, notifier_id=self._id)
except Exception:
# Could check if the exception has to do with disconnection, but just in case better to restart sharing context anyway
logger.warning(f"Exception in publish, resetting sharing context")
self._broadcaster_sharing_context = None
raise

# otherwise just notify
else:
await self.notifier.notify(topics, data, notifier_id=self._id)
Expand All @@ -132,14 +145,19 @@ async def main_loop(self, websocket: WebSocket, client_id: str = None, **kwargs)
async with self.broadcaster:
logger.debug("Entering endpoint's main loop with broadcaster")
if self._ignore_broadcaster_disconnected:
await self.endpoint.main_loop(websocket, client_id=client_id, **kwargs)
await self.endpoint.main_loop(
websocket, client_id=client_id, **kwargs
)
else:
main_loop_task = asyncio.create_task(
self.endpoint.main_loop(websocket, client_id=client_id, **kwargs)
self.endpoint.main_loop(
websocket, client_id=client_id, **kwargs
)
)
done, pending = await asyncio.wait(
[main_loop_task, self.broadcaster.get_reader_task()],
return_when=asyncio.FIRST_COMPLETED,
)
done, pending = await asyncio.wait([main_loop_task,
self.broadcaster.get_reader_task()],
return_when=asyncio.FIRST_COMPLETED)
logger.debug(f"task is done: {done}")
# broadcaster's reader task is used by other endpoints and shouldn't be cancelled
if main_loop_task in pending:
Expand Down

0 comments on commit ea16b34

Please sign in to comment.