-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize publishing #59
Closed
Closed
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
ea16b34
PubSubEndpoint: Reuse EventBroadcaster's sharing context
roekatz ccee665
PubSubEndpoint: Notify local subscribers through broadcaster
roekatz 4f34927
EventBroadcaster: Improve log messages
roekatz 17f8388
EventBroadcaster: Fix locking bug in context manager
roekatz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,7 @@ | |
from fastapi_websocket_rpc.utils import gen_uid | ||
|
||
|
||
logger = get_logger('EventBroadcaster') | ||
logger = get_logger("EventBroadcaster") | ||
|
||
|
||
# Cross service broadcast consts | ||
|
@@ -35,9 +35,14 @@ class EventBroadcasterContextManager: | |
Friend-like class of EventBroadcaster (accessing "protected" members ) | ||
""" | ||
|
||
def __init__(self, event_broadcaster: "EventBroadcaster", listen: bool = True, share: bool = True) -> None: | ||
def __init__( | ||
self, | ||
event_broadcaster: "EventBroadcaster", | ||
listen: bool = True, | ||
share: bool = True, | ||
) -> None: | ||
""" | ||
Provide a context manager for an EventBroadcaster, managing if it listens to events coming from the broadcaster | ||
Provide a context manager for an EventBroadcaster, managing if it listens to events coming from the broadcaster | ||
and if it subscribes to the internal notifier to share its events with the broadcaster | ||
|
||
Args: | ||
|
@@ -48,15 +53,16 @@ def __init__(self, event_broadcaster: "EventBroadcaster", listen: bool = True, s | |
self._event_broadcaster = event_broadcaster | ||
self._share: bool = share | ||
self._listen: bool = listen | ||
self._lock = asyncio.Lock() | ||
|
||
async def __aenter__(self): | ||
async with self._lock: | ||
async with self._event_broadcaster._context_manager_lock: | ||
if self._listen: | ||
self._event_broadcaster._listen_count += 1 | ||
if self._event_broadcaster._listen_count == 1: | ||
# We have our first listener start the read-task for it (And all those who'd follow) | ||
logger.info("Listening for incoming events from broadcast channel (first listener started)") | ||
logger.info( | ||
"Listening for incoming events from broadcast channel (first listener started)" | ||
) | ||
# Start task listening on incoming broadcasts | ||
self._event_broadcaster.start_reader_task() | ||
|
||
|
@@ -66,15 +72,17 @@ async def __aenter__(self): | |
# We have our first publisher | ||
# Init the broadcast used for sharing (reading has its own) | ||
self._event_broadcaster._acquire_sharing_broadcast_channel() | ||
logger.debug("Subscribing to ALL_TOPICS, and sharing messages with broadcast channel") | ||
# Subscribe to internal events form our own event notifier and broadcast them | ||
await self._event_broadcaster._subscribe_to_all_topics() | ||
logger.debug( | ||
"Subscribing to ALL_TOPICS, and sharing messages with broadcast channel" | ||
) | ||
else: | ||
logger.debug(f"Did not subscribe to ALL_TOPICS: share count == {self._event_broadcaster._share_count}") | ||
logger.debug( | ||
f"Did not subscribe to ALL_TOPICS: share count == {self._event_broadcaster._share_count}" | ||
) | ||
return self | ||
|
||
async def __aexit__(self, exc_type, exc, tb): | ||
async with self._lock: | ||
async with self._event_broadcaster._context_manager_lock: | ||
try: | ||
if self._listen: | ||
self._event_broadcaster._listen_count -= 1 | ||
|
@@ -87,12 +95,10 @@ async def __aexit__(self, exc_type, exc, tb): | |
self._event_broadcaster._subscription_task = None | ||
|
||
if self._share: | ||
self._event_broadcaster._share_count -= 1 | ||
self._event_broadcaster._share_count -= 1 | ||
# if this was last sharer - we can stop subscribing to internal events - we aren't sharing anymore | ||
if self._event_broadcaster._share_count == 0: | ||
# Unsubscribe from internal events | ||
logger.debug("Unsubscribing from ALL TOPICS") | ||
await self._event_broadcaster._unsubscribe_from_topics() | ||
pass | ||
|
||
except: | ||
logger.exception("Failed to exit EventBroadcaster context") | ||
|
@@ -110,8 +116,14 @@ class EventBroadcaster: | |
<Your Code> | ||
""" | ||
|
||
def __init__(self, broadcast_url: str, notifier: EventNotifier, channel="EventNotifier", | ||
broadcast_type=None, is_publish_only=False) -> None: | ||
def __init__( | ||
self, | ||
broadcast_url: str, | ||
notifier: EventNotifier, | ||
channel="EventNotifier", | ||
broadcast_type=None, | ||
is_publish_only=False, | ||
) -> None: | ||
""" | ||
|
||
Args: | ||
|
@@ -138,26 +150,31 @@ def __init__(self, broadcast_url: str, notifier: EventNotifier, channel="EventNo | |
self._publish_lock = None | ||
# used to track creation / removal of resources needed per type (reader task->listen, and subscription to internal events->share) | ||
self._listen_count: int = 0 | ||
self._share_count: int = 0 | ||
# If we opt to manage the context directly (i.e. call async with on the event broadcaster itself) | ||
self._share_count: int = 0 | ||
# If we opt to manage the context directly (i.e. call async with on the event broadcaster itself) | ||
self._context_manager = None | ||
self._context_manager_lock = asyncio.Lock() | ||
|
||
|
||
async def __broadcast_notifications__(self, subscription: Subscription, data): | ||
async def __broadcast_notifications__(self, topics: TopicList, data): | ||
""" | ||
Share incoming internal notifications with the entire broadcast channel | ||
|
||
Args: | ||
subscription (Subscription): the subscription that got triggered | ||
data: the event data | ||
""" | ||
logger.info("Broadcasting incoming event: {}".format({'topic': subscription.topic, 'notifier_id': self._id})) | ||
note = BroadcastNotification(notifier_id=self._id, topics=[ | ||
subscription.topic], data=data) | ||
logger.info( | ||
"Broadcasting incoming event: {}".format( | ||
{"topics": topics, "notifier_id": self._id} | ||
) | ||
) | ||
note = BroadcastNotification(notifier_id=self._id, topics=topics, data=data) | ||
# Publish event to broadcast | ||
async with self._publish_lock: | ||
async with self._sharing_broadcast_channel: | ||
await self._sharing_broadcast_channel.publish(self._channel, note.json()) | ||
await self._sharing_broadcast_channel.publish( | ||
self._channel, note.json() | ||
) | ||
|
||
def _acquire_sharing_broadcast_channel(self): | ||
""" | ||
|
@@ -166,14 +183,6 @@ def _acquire_sharing_broadcast_channel(self): | |
self._publish_lock = asyncio.Lock() | ||
self._sharing_broadcast_channel = self._broadcast_type(self._broadcast_url) | ||
|
||
async def _subscribe_to_all_topics(self): | ||
return await self._notifier.subscribe(self._id, | ||
ALL_TOPICS, | ||
self.__broadcast_notifications__) | ||
|
||
async def _unsubscribe_from_topics(self): | ||
return await self._notifier.unsubscribe(self._id) | ||
|
||
def get_context(self, listen=True, share=True): | ||
""" | ||
Create a new context manager you can call 'async with' on, configuring the broadcaster for listening, sharing, or both. | ||
|
@@ -183,16 +192,16 @@ def get_context(self, listen=True, share=True): | |
share (bool, optional): Should we share events with the broadcast channel. Defaults to True. | ||
|
||
Returns: | ||
EventBroadcasterContextManager: the context | ||
EventBroadcasterContextManager: the context | ||
""" | ||
return EventBroadcasterContextManager(self, listen=listen, share=share) | ||
|
||
def get_listening_context(self): | ||
return EventBroadcasterContextManager(self, listen=True, share=False) | ||
|
||
def get_sharing_context(self): | ||
return EventBroadcasterContextManager(self, listen=False, share=True) | ||
|
||
async def __aenter__(self): | ||
""" | ||
Convince caller (also backward compaltability) | ||
|
@@ -201,7 +210,6 @@ async def __aenter__(self): | |
self._context_manager = self.get_context(listen=not self._is_publish_only) | ||
return await self._context_manager.__aenter__() | ||
|
||
|
||
async def __aexit__(self, exc_type, exc, tb): | ||
await self._context_manager.__aexit__(exc_type, exc, tb) | ||
|
||
|
@@ -215,14 +223,15 @@ def start_reader_task(self): | |
# Make sure a task wasn't started already | ||
if self._subscription_task is not None: | ||
# we already started a task for this worker process | ||
logger.debug("No need for listen task, already started broadcast listen task for this notifier") | ||
logger.debug( | ||
"No need for listen task, already started broadcast listen task for this notifier" | ||
) | ||
return | ||
# Trigger the task | ||
logger.debug("Spawning broadcast listen task") | ||
self._subscription_task = asyncio.create_task( | ||
self.__read_notifications__()) | ||
self._subscription_task = asyncio.create_task(self.__read_notifications__()) | ||
return self._subscription_task | ||
|
||
def get_reader_task(self): | ||
return self._subscription_task | ||
|
||
|
@@ -235,15 +244,26 @@ async def __read_notifications__(self): | |
listening_broadcast_channel = self._broadcast_type(self._broadcast_url) | ||
async with listening_broadcast_channel: | ||
# Subscribe to our channel | ||
async with listening_broadcast_channel.subscribe(channel=self._channel) as subscriber: | ||
async with listening_broadcast_channel.subscribe( | ||
channel=self._channel | ||
) as subscriber: | ||
async for event in subscriber: | ||
try: | ||
notification = BroadcastNotification.parse_raw( | ||
event.message) | ||
# Avoid re-publishing our own broadcasts | ||
if notification.notifier_id != self._id: | ||
logger.info("Handling incoming broadcast event: {}".format({'topics': notification.topics, 'src': notification.notifier_id})) | ||
# Notify subscribers of message received from broadcast | ||
await self._notifier.notify(notification.topics, notification.data, notifier_id=self._id) | ||
notification = BroadcastNotification.parse_raw(event.message) | ||
logger.debug( | ||
"Handling incoming broadcast event: {}".format( | ||
{ | ||
"topics": notification.topics, | ||
"src": notification.notifier_id, | ||
} | ||
) | ||
) | ||
# Notify subscribers of message received from broadcast | ||
await self._notifier.notify( | ||
notification.topics, notification.data, notifier_id=self._id | ||
) | ||
except: | ||
logger.exception("Failed handling incoming broadcast") | ||
logger.info( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems more like a debug message than info |
||
"No more events to read from subscriber (underlying connection closed)" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand why you dropped the
_unsubscribe_from_topics
?