Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle exception when websocket server start room failed #289

Merged
merged 1 commit into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from jupyter_server.auth import authorized
from jupyter_server.base.handlers import APIHandler, JupyterHandler
from jupyter_server.utils import ensure_async
from jupyter_ydoc import ydocs as YDOCS
from pycrdt_websocket.websocket_server import YRoom
from pycrdt_websocket.ystore import BaseYStore
Expand Down Expand Up @@ -70,6 +71,7 @@ def create_task(self, aw):
task.add_done_callback(self._background_tasks.discard)

async def prepare(self):
await ensure_async(super().prepare())
if not self._websocket_server.started.is_set():
self.create_task(self._websocket_server.start())
await self._websocket_server.started.wait()
Expand Down Expand Up @@ -110,12 +112,26 @@ async def prepare(self):
# it is a transient document (e.g. awareness)
self.room = TransientRoom(self._room_id, self.log)

await self._websocket_server.start_room(self.room)
self._websocket_server.add_room(self._room_id, self.room)
try:
await self._websocket_server.start_room(self.room)
except Exception as e:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but didn't we introduced the YRoom exception handler just for that purpose? It seems we are bypassing all the work we did in pycrdt-websocket and instead directly handling the exception here.

self.log.error("Room %s failed to start on websocket server", self._room_id)
# Clean room
await self.room.stop()
self.log.info("Room %s deleted", self._room_id)
self._emit(LogLevel.INFO, "clean", "Room deleted.")

res = super().prepare()
if res is not None:
return await res
# Clean the file loader in file loader mapping if there are not any rooms using it
_, _, file_id = decode_file_path(self._room_id)
file = self._file_loaders[file_id]
if file.number_of_subscriptions == 0 or (
file.number_of_subscriptions == 1 and self._room_id in file._subscriptions
):
self.log.info("Deleting file %s", file.path)
await self._file_loaders.remove(file_id)
self._emit(LogLevel.INFO, "clean", "file loader removed.")
raise e
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the exception be raised, or just logged?

Copy link
Collaborator Author

@jzhang20133 jzhang20133 Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be raised when task group is not active and the request to set up websocket will fail and UI will retry. We should not fail silently here if task group is not active.

self._websocket_server.add_room(self._room_id, self.room)

def initialize(
self,
Expand All @@ -134,6 +150,8 @@ def initialize(
self._document_save_delay = document_save_delay
self._websocket_server = ywebsocket_server
self._message_queue = asyncio.Queue()
self._room_id = ""
self.room = None

@property
def path(self):
Expand Down Expand Up @@ -227,7 +245,7 @@ async def send(self, message):
try:
self.write_message(message, binary=True)
except Exception as e:
self.log.debug("Failed to write message", exc_info=e)
self.log.error("Failed to write message", exc_info=e)

async def recv(self):
"""
Expand Down
5 changes: 4 additions & 1 deletion projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ async def clean(self) -> None:
if self._watcher is not None:
if not self._watcher.cancelled():
self._watcher.cancel()
await self._watcher
try:
await self._watcher
except asyncio.CancelledError:
self._log.info(f"file watcher for '{self.file_id}' is cancelled now")

def observe(self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]) -> None:
"""
Expand Down
14 changes: 13 additions & 1 deletion projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ async def stop(self) -> None:
Cancels the save task and unsubscribes from the file.
"""
await super().stop()
try:
await super().stop()
except RuntimeError:
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done in pycrdt-websocket using the exception handler.

# TODO: Should we cancel or wait ?
if self._saving_document:
self._saving_document.cancel()
Expand Down Expand Up @@ -299,3 +302,12 @@ async def _broadcast_updates(self):
await super()._broadcast_updates()
except asyncio.CancelledError:
pass

async def stop(self) -> None:
"""
Stop the room.
"""
try:
await super().stop()
except RuntimeError:
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done in pycrdt-websocket using the exception handler.

84 changes: 84 additions & 0 deletions tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,87 @@ async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None:
assert collected_data[1]["action"] == "leave"
assert collected_data[1]["roomid"] == "text:file:" + fim.get_id("test.txt")
assert collected_data[1]["username"] is not None


async def test_room_handler_doc_client_should_cleanup_room_file(
rtc_create_file, rtc_connect_doc_client, jp_serverapp
):
path, _ = await rtc_create_file("test.txt", "test")

event = Event()

def _on_document_change(target: str, e: Any) -> None:
if target == "source":
event.set()

doc = YUnicode()
doc.observe(_on_document_change)

async with await rtc_connect_doc_client("text", "file", path) as ws, WebsocketProvider(
doc.ydoc, ws
):
await event.wait()
await sleep(0.1)

# kill websocketserver to mimic task group inactive failure
await jp_serverapp.web_app.settings["jupyter_server_ydoc"].ywebsocket_server.stop()

listener_was_called = False
collected_data = []

async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None:
nonlocal listener_was_called
collected_data.append(data)
listener_was_called = True

event_logger = jp_serverapp.event_logger
event_logger.add_listener(
schema_id="https://schema.jupyter.org/jupyter_collaboration/session/v1",
listener=my_listener,
)

path2, _ = await rtc_create_file("test2.txt", "test2")

try:
async with await rtc_connect_doc_client("text2", "file2", path2) as ws, WebsocketProvider(
doc.ydoc, ws
):
await event.wait()
await sleep(0.1)
except Exception:
pass

try:
async with await rtc_connect_doc_client("text2", "file2", path2) as ws, WebsocketProvider(
doc.ydoc, ws
):
await event.wait()
await sleep(0.1)
except Exception:
pass

fim = jp_serverapp.web_app.settings["file_id_manager"]

assert listener_was_called is True
assert len(collected_data) == 4
# no two collaboration events are emitted.
# [{'level': 'WARNING', 'msg': 'There is another collaborative session accessing the same file.\nThe synchronization bet...ou might lose some of your changes.', 'path': 'test2.txt', 'room': 'text2:file2:51b7e24f-f534-47fb-882f-5cc45ba867fe'}]
assert collected_data[0]["path"] == "test2.txt"
assert collected_data[0]["room"] == "text2:file2:" + fim.get_id("test2.txt")
assert collected_data[0]["action"] == "clean"
assert collected_data[0]["msg"] == "Room deleted."
assert collected_data[1]["path"] == "test2.txt"
assert collected_data[1]["room"] == "text2:file2:" + fim.get_id("test2.txt")
assert collected_data[1]["action"] == "clean"
assert collected_data[1]["msg"] == "file loader removed."
assert collected_data[2]["path"] == "test2.txt"
assert collected_data[2]["room"] == "text2:file2:" + fim.get_id("test2.txt")
assert collected_data[2]["action"] == "clean"
assert collected_data[2]["msg"] == "Room deleted."
assert collected_data[3]["path"] == "test2.txt"
assert collected_data[3]["room"] == "text2:file2:" + fim.get_id("test2.txt")
assert collected_data[3]["action"] == "clean"
assert collected_data[3]["msg"] == "file loader removed."

await jp_serverapp.web_app.settings["jupyter_server_ydoc"].stop_extension()
del jp_serverapp.web_app.settings["file_id_manager"]
Loading