-
Notifications
You must be signed in to change notification settings - Fork 0
/
webSocketManger.py
70 lines (58 loc) · 2.54 KB
/
webSocketManger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class WebSocketManager:
def __init__(self):
"""
Initializes the WebSocketManager.
Attributes:
rooms (dict): A dictionary to store WebSocket connections in different rooms.
pubsub_client (RedisPubSubManager): An instance of the RedisPubSubManager class for pub-sub functionality.
"""
self.rooms: dict = {}
self.pubsub_client = RedisPubSubManager()
async def add_user_to_room(self, room_id: str, websocket: WebSocket) -> None:
"""
Adds a user's WebSocket connection to a room.
Args:
room_id (str): Room ID or channel name.
websocket (WebSocket): WebSocket connection object.
"""
await websocket.accept()
if room_id in self.rooms:
self.rooms[room_id].append(websocket)
else:
self.rooms[room_id] = [websocket]
await self.pubsub_client.connect()
pubsub_subscriber = await self.pubsub_client.subscribe(room_id)
asyncio.create_task(self._pubsub_data_reader(pubsub_subscriber))
async def broadcast_to_room(self, room_id: str, message: str) -> None:
"""
Broadcasts a message to all connected WebSockets in a room.
Args:
room_id (str): Room ID or channel name.
message (str): Message to be broadcasted.
"""
await self.pubsub_client._publish(room_id, message)
async def remove_user_from_room(self, room_id: str, websocket: WebSocket) -> None:
"""
Removes a user's WebSocket connection from a room.
Args:
room_id (str): Room ID or channel name.
websocket (WebSocket): WebSocket connection object.
"""
self.rooms[room_id].remove(websocket)
if len(self.rooms[room_id]) == 0:
del self.rooms[room_id]
await self.pubsub_client.unsubscribe(room_id)
async def _pubsub_data_reader(self, pubsub_subscriber):
"""
Reads and broadcasts messages received from Redis PubSub.
Args:
pubsub_subscriber (aioredis.ChannelSubscribe): PubSub object for the subscribed channel.
"""
while True:
message = await pubsub_subscriber.get_message(ignore_subscribe_messages=True)
if message is not None:
room_id = message['channel'].decode('utf-8')
all_sockets = self.rooms[room_id]
for socket in all_sockets:
data = message['data'].decode('utf-8')
await socket.send_text(data)