diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index d74bd6df..ab100b74 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -11,6 +11,7 @@ from jupyter_server.auth import authorized from jupyter_server.base.handlers import APIHandler, JupyterHandler +from jupyter_server.utils import ensure_async from jupyter_ydoc import ydocs as YDOCS from pycrdt_websocket.websocket_server import YRoom from pycrdt_websocket.ystore import BaseYStore @@ -70,6 +71,7 @@ def create_task(self, aw): task.add_done_callback(self._background_tasks.discard) async def prepare(self): + await ensure_async(super().prepare()) if not self._websocket_server.started.is_set(): self.create_task(self._websocket_server.start()) await self._websocket_server.started.wait() @@ -110,12 +112,26 @@ async def prepare(self): # it is a transient document (e.g. awareness) self.room = TransientRoom(self._room_id, self.log) - await self._websocket_server.start_room(self.room) - self._websocket_server.add_room(self._room_id, self.room) - - res = super().prepare() - if res is not None: - return await res + try: + await self._websocket_server.start_room(self.room) + except Exception as e: + self.log.error("Room %s failed to start on websocket server", self._room_id) + # Clean room + await self.room.stop() + self.log.info("Room %s deleted", self._room_id) + self._emit(LogLevel.INFO, "clean", "Room deleted.") + + # Clean the file loader in file loader mapping if there are not any rooms using it + _, _, file_id = decode_file_path(self._room_id) + file = self._file_loaders[file_id] + if file.number_of_subscriptions == 0 or ( + file.number_of_subscriptions == 1 and self._room_id in file._subscriptions + ): + self.log.info("Deleting file %s", file.path) + await self._file_loaders.remove(file_id) + self._emit(LogLevel.INFO, "clean", "file loader removed.") + raise e + self._websocket_server.add_room(self._room_id, self.room) def initialize( self, @@ -134,6 +150,8 @@ def initialize( self._document_save_delay = document_save_delay self._websocket_server = ywebsocket_server self._message_queue = asyncio.Queue() + self._room_id = "" + self.room = None @property def path(self): @@ -227,7 +245,7 @@ async def send(self, message): try: self.write_message(message, binary=True) except Exception as e: - self.log.debug("Failed to write message", exc_info=e) + self.log.error("Failed to write message", exc_info=e) async def recv(self): """ diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py index 62cc0a0a..a2622cca 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py @@ -74,7 +74,10 @@ async def clean(self) -> None: if self._watcher is not None: if not self._watcher.cancelled(): self._watcher.cancel() - await self._watcher + try: + await self._watcher + except asyncio.CancelledError: + self._log.info(f"file watcher for '{self.file_id}' is cancelled now") def observe(self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]) -> None: """ diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py index 691943c5..7559334c 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py @@ -161,7 +161,10 @@ async def stop(self) -> None: Cancels the save task and unsubscribes from the file. """ - await super().stop() + try: + await super().stop() + except RuntimeError: + pass # TODO: Should we cancel or wait ? if self._saving_document: self._saving_document.cancel() @@ -299,3 +302,12 @@ async def _broadcast_updates(self): await super()._broadcast_updates() except asyncio.CancelledError: pass + + async def stop(self) -> None: + """ + Stop the room. + """ + try: + await super().stop() + except RuntimeError: + pass diff --git a/tests/test_handlers.py b/tests/test_handlers.py index 0a8bad29..96aaded5 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -131,3 +131,87 @@ async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None: assert collected_data[1]["action"] == "leave" assert collected_data[1]["roomid"] == "text:file:" + fim.get_id("test.txt") assert collected_data[1]["username"] is not None + + +async def test_room_handler_doc_client_should_cleanup_room_file( + rtc_create_file, rtc_connect_doc_client, jp_serverapp +): + path, _ = await rtc_create_file("test.txt", "test") + + event = Event() + + def _on_document_change(target: str, e: Any) -> None: + if target == "source": + event.set() + + doc = YUnicode() + doc.observe(_on_document_change) + + async with await rtc_connect_doc_client("text", "file", path) as ws, WebsocketProvider( + doc.ydoc, ws + ): + await event.wait() + await sleep(0.1) + + # kill websocketserver to mimic task group inactive failure + await jp_serverapp.web_app.settings["jupyter_server_ydoc"].ywebsocket_server.stop() + + listener_was_called = False + collected_data = [] + + async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None: + nonlocal listener_was_called + collected_data.append(data) + listener_was_called = True + + event_logger = jp_serverapp.event_logger + event_logger.add_listener( + schema_id="https://schema.jupyter.org/jupyter_collaboration/session/v1", + listener=my_listener, + ) + + path2, _ = await rtc_create_file("test2.txt", "test2") + + try: + async with await rtc_connect_doc_client("text2", "file2", path2) as ws, WebsocketProvider( + doc.ydoc, ws + ): + await event.wait() + await sleep(0.1) + except Exception: + pass + + try: + async with await rtc_connect_doc_client("text2", "file2", path2) as ws, WebsocketProvider( + doc.ydoc, ws + ): + await event.wait() + await sleep(0.1) + except Exception: + pass + + fim = jp_serverapp.web_app.settings["file_id_manager"] + + assert listener_was_called is True + assert len(collected_data) == 4 + # no two collaboration events are emitted. + # [{'level': 'WARNING', 'msg': 'There is another collaborative session accessing the same file.\nThe synchronization bet...ou might lose some of your changes.', 'path': 'test2.txt', 'room': 'text2:file2:51b7e24f-f534-47fb-882f-5cc45ba867fe'}] + assert collected_data[0]["path"] == "test2.txt" + assert collected_data[0]["room"] == "text2:file2:" + fim.get_id("test2.txt") + assert collected_data[0]["action"] == "clean" + assert collected_data[0]["msg"] == "Room deleted." + assert collected_data[1]["path"] == "test2.txt" + assert collected_data[1]["room"] == "text2:file2:" + fim.get_id("test2.txt") + assert collected_data[1]["action"] == "clean" + assert collected_data[1]["msg"] == "file loader removed." + assert collected_data[2]["path"] == "test2.txt" + assert collected_data[2]["room"] == "text2:file2:" + fim.get_id("test2.txt") + assert collected_data[2]["action"] == "clean" + assert collected_data[2]["msg"] == "Room deleted." + assert collected_data[3]["path"] == "test2.txt" + assert collected_data[3]["room"] == "text2:file2:" + fim.get_id("test2.txt") + assert collected_data[3]["action"] == "clean" + assert collected_data[3]["msg"] == "file loader removed." + + await jp_serverapp.web_app.settings["jupyter_server_ydoc"].stop_extension() + del jp_serverapp.web_app.settings["file_id_manager"]