Skip to content

Commit

Permalink
Small improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
hbcarlos committed Oct 12, 2023
1 parent 2c07098 commit 8183da4
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 51 deletions.
1 change: 0 additions & 1 deletion jupyter_collaboration/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from .rooms import RoomManager
from .stores import BaseYStore, SQLiteYStore
from .utils import EVENTS_SCHEMA_PATH
from .websocketserver import JupyterWebsocketServer


class YDocExtension(ExtensionApp):
Expand Down
83 changes: 34 additions & 49 deletions jupyter_collaboration/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ class YDocWebSocketHandler(WebSocketHandler, JupyterHandler):
_serve_task: asyncio.Task
_message_queue: asyncio.Queue[Any]

async def prepare(self):
# Get room
self._room_id: str = self.request.path.split("/")[-1]
self.room = await self._room_manager.get_room(self._room_id)
return await super().prepare()

def initialize(
self, room_manager: RoomManager, document_cleanup_delay: float | None = 60.0
) -> None:
Expand Down Expand Up @@ -112,47 +106,42 @@ async def open(self, room_id):
"""
On connection open.
"""
# Start processing messages in the room
self._serve_task = asyncio.create_task(self.room.serve(self))
self._room_id: str = self.request.path.split("/")[-1]

# Close the connection if the document session expired
session_id = self.get_query_argument("sessionId", None)
if session_id and SERVER_SESSION != session_id:
self.close(
1003,
f"Document session {session_id} expired. You need to reload this browser tab.",
)
try:
# Get room
self.room = await self._room_manager.get_room(self._room_id)

if isinstance(self.room, DocumentRoom):
# Close the connection if the document session expired
session_id = self.get_query_argument("sessionId", "")
if SERVER_SESSION != session_id:
self.close(
1003,
f"Document session {session_id} expired. You need to reload this browser tab.",
)

# TODO: Move initialization to RoomManager to make sure only one
# client calls initialize
try:
# Initialize the room
await self.room.initialize()
except Exception as e:
_, _, file_id = decode_file_path(self._room_id)
path = self._file_id_manager.get_path(file_id)

# Close websocket and propagate error.
if isinstance(e, web.HTTPError):
self.log.error(f"File {path} not found.\n{e!r}", exc_info=e)
self.close(1004, f"File {path} not found.")
else:
self.log.error(f"Error initializing: {path}\n{e!r}", exc_info=e)
self.close(1003, f"Error initializing: {path}. You need to close the document.")

# Clean up the room and delete the file loader
if (
self.room is not None
and len(self.room.clients) == 0
or self.room.clients == [self]
):
self._message_queue.put_nowait(b"")
self._serve_task.cancel()
await self._room_manager.remove(self._room_id)
except Exception as e:
_, _, file_id = decode_file_path(self._room_id)
path = self._file_id_manager.get_path(file_id)

# Close websocket and propagate error.
if isinstance(e, web.HTTPError):
self.log.error(f"File {path} not found.\n{e!r}", exc_info=e)
self.close(1004, f"File {path} not found.")
else:
self.log.error(f"Error initializing: {path}\n{e!r}", exc_info=e)
self.close(1003, f"Error initializing: {path}. You need to close the document.")

# Clean up the room and delete the file loader
if self.room is not None and len(self.room.clients) == 0 or self.room.clients == [self]:
self._message_queue.put_nowait(b"")
self._serve_task.cancel()
await self._room_manager.remove_room(self._room_id)

self._emit(LogLevel.INFO, "initialize", "New client connected.")

# Start processing messages in the room
self._serve_task = asyncio.create_task(self.room.serve(self))

async def send(self, message):
"""
Send a message to the client.
Expand Down Expand Up @@ -201,15 +190,11 @@ def on_close(self) -> None:
if self._serve_task is not None and not self._serve_task.cancelled():
self._serve_task.cancel()

if (
self.room is not None
and isinstance(self.room, DocumentRoom)
and self.room.clients == [self]
):
if self.room is not None and self.room.clients == [self]:
# no client in this room after we disconnect
# Remove the room with a delay in case someone reconnects
IOLoop.current().add_callback(
self._room_manager.remove, self._room_id, self._cleanup_delay
self._room_manager.remove_room, self._room_id, self._cleanup_delay
)

def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None:
Expand Down
6 changes: 6 additions & 0 deletions jupyter_collaboration/rooms/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ def room_id(self) -> str:
"""
return self._room_id

async def initialize(self) -> None:
return

async def handle_msg(self, data: bytes) -> None:
return

def broadcast_msg(self, msg: bytes) -> None:
for client in self.clients:
self._task_group.start_soon(client.send, msg)
Expand Down
7 changes: 6 additions & 1 deletion jupyter_collaboration/rooms/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ async def get_room(self, room_id: str) -> BaseRoom | None:
if not room.started.is_set():
self._room_tasks[room_id] = asyncio.create_task(room.start())

if not room.ready:
await room.initialize()

return room

async def remove(self, room_id: str, delay: float = 0) -> None:
async def remove_room(self, room_id: str, delay: float = 0) -> None:
"""Remove the room for a given id."""
# Use lock to make sure while a client is creating the
# clean up task, no one else is accessing the room or trying to
Expand All @@ -86,6 +89,8 @@ async def remove(self, room_id: str, delay: float = 0) -> None:
if room_id in self._clean_up_tasks:
return

# NOTE: Should we check if there is only one client?
# if len(self._rooms[room_id].clients) <= 1:
self._clean_up_tasks[room_id] = asyncio.create_task(self._clean_up_room(room_id, delay))

async def clear(self) -> None:
Expand Down

0 comments on commit 8183da4

Please sign in to comment.