From 28b4748b037704962abdb99fc200ccf13a8b75cd Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Mon, 16 Oct 2023 18:10:17 +0200 Subject: [PATCH 1/3] Improves document sessions --- jupyter_collaboration/app.py | 11 +- jupyter_collaboration/handlers.py | 100 +++++++++++++------ jupyter_collaboration/rooms/base.py | 14 +++ jupyter_collaboration/rooms/document.py | 12 +-- jupyter_collaboration/rooms/manager.py | 19 ++-- jupyter_collaboration/stores/base_store.py | 4 +- jupyter_collaboration/stores/file_store.py | 12 +-- jupyter_collaboration/stores/sqlite_store.py | 16 +-- packages/docprovider/src/yprovider.ts | 5 +- 9 files changed, 125 insertions(+), 68 deletions(-) diff --git a/jupyter_collaboration/app.py b/jupyter_collaboration/app.py index 7bce58fe..e7dc9bb7 100644 --- a/jupyter_collaboration/app.py +++ b/jupyter_collaboration/app.py @@ -32,7 +32,7 @@ class YDocExtension(ExtensionApp): ) document_cleanup_delay = Float( - 60, + 5, allow_none=True, config=True, help="""The delay in seconds to keep a document in memory in the back-end after all clients @@ -107,7 +107,14 @@ def initialize_handlers(self): "room_manager": self.room_manager, }, ), - (r"/api/collaboration/session/(.*)", DocSessionHandler), + ( + r"/api/collaboration/session/(.*)", + DocSessionHandler, + { + "store": self.store, + "room_manager": self.room_manager, + }, + ), ] ) diff --git a/jupyter_collaboration/handlers.py b/jupyter_collaboration/handlers.py index 76f14a42..cd5d3ee0 100644 --- a/jupyter_collaboration/handlers.py +++ b/jupyter_collaboration/handlers.py @@ -6,7 +6,6 @@ import asyncio import json import time -import uuid from typing import Any from jupyter_server.auth import authorized @@ -28,8 +27,6 @@ YFILE = YDOCS["file"] -SERVER_SESSION = str(uuid.uuid4()) - class YDocWebSocketHandler(WebSocketHandler, JupyterHandler): """`YDocWebSocketHandler` uses the singleton pattern for ``WebsocketServer``, @@ -62,6 +59,7 @@ def initialize( room_manager: RoomManager, document_cleanup_delay: float | None = 60.0, ) -> None: + super().initialize() # File ID manager cannot be passed as argument as the extension may load after this one self._file_id_manager = self.settings["file_id_manager"] @@ -124,13 +122,6 @@ async def open(self, room_id): """ self._room_id = self.request.path.split("/")[-1] - # Close the connection if the document session expired - session_id = self.get_query_argument("sessionId", None) - if session_id and SERVER_SESSION != session_id: - self.close( - 1003, - f"Document session {session_id} expired. You need to reload this browser tab.", - ) try: # Get room self.room = await self._room_manager.get_room(self._room_id) @@ -154,10 +145,22 @@ async def open(self, room_id): self._serve_task.cancel() await self._room_manager.remove_room(self._room_id) - self._emit(LogLevel.INFO, "initialize", "New client connected.") + return + + # Close the connection if the document session expired + session_id = self.get_query_argument("sessionId", None) + if session_id and session_id != self.room.session_id: + self.log.error( + f"Client tried to connect to {self._room_id} with an expired session ID {session_id}." + ) + self.close( + 1003, + f"Document session {session_id} expired. You need to reload this browser tab.", + ) # Start processing messages in the room self._serve_task = asyncio.create_task(self.room.serve(self)) + self._emit(LogLevel.INFO, "initialize", "New client connected.") async def send(self, message): """ @@ -207,14 +210,20 @@ def on_close(self) -> None: if self._serve_task is not None and not self._serve_task.cancelled(): self._serve_task.cancel() - if self.room is not None and self.room.clients == [self]: - # no client in this room after we disconnect - # Remove the room with a delay in case someone reconnects - IOLoop.current().add_callback( - self._room_manager.remove_room, self._room_id, self._cleanup_delay - ) + if self.room is not None: + # Remove it self from the list of clients + self.room.clients.remove(self) + if len(self.room.clients) == 0: + # no client in this room after we disconnect + # Remove the room with a delay in case someone reconnects + IOLoop.current().add_callback( + self._room_manager.remove_room, self._room_id, self._cleanup_delay + ) def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None: + if self._room_id.count(":") < 2: + return + _, _, file_id = decode_file_path(self._room_id) path = self._file_id_manager.get_path(file_id) @@ -240,6 +249,22 @@ class DocSessionHandler(APIHandler): auth_resource = "contents" + def initialize(self, store: BaseYStore, room_manager: RoomManager) -> None: + super().initialize() + self._store = store + self._room_manager = room_manager + + async def prepare(self): + # NOTE: Initialize in the ExtensionApp.start_extension once + # https://github.com/jupyter-server/jupyter_server/issues/1329 + # is done. + # We are temporarily initializing the store here because the + # initialization is async + if not self._store.initialized: + await self._store.initialize() + + return await super().prepare() + @web.authenticated @authorized async def put(self, path): @@ -251,26 +276,35 @@ async def put(self, path): content_type = body["type"] file_id_manager = self.settings["file_id_manager"] + status = 200 idx = file_id_manager.get_id(path) - if idx is not None: - # index already exists - self.log.info("Request for Y document '%s' with room ID: %s", path, idx) - data = json.dumps( - {"format": format, "type": content_type, "fileId": idx, "sessionId": SERVER_SESSION} - ) - self.set_status(200) - return self.finish(data) - - # try indexing - idx = file_id_manager.index(path) if idx is None: - # file does not exists - raise web.HTTPError(404, f"File {path!r} does not exist") + # try indexing + status = 201 + idx = file_id_manager.index(path) + if idx is None: + # file does not exists + raise web.HTTPError(404, f"File {path!r} does not exist") + + session_id = await self._get_session_id(f"{format}:{content_type}:{idx}") - # index successfully created self.log.info("Request for Y document '%s' with room ID: %s", path, idx) data = json.dumps( - {"format": format, "type": content_type, "fileId": idx, "sessionId": SERVER_SESSION} + {"format": format, "type": content_type, "fileId": idx, "sessionId": session_id} ) - self.set_status(201) + self.set_status(status) return self.finish(data) + + async def _get_session_id(self, room_id: str) -> str | None: + # If the room exists and it is ready, return the session_id from the room. + if self._room_manager.has_room(room_id): + room = await self._room_manager.get_room(room_id) + if room.ready: + return room.session_id + + if await self._store.exists(room_id): + doc = await self._store.get(room_id) + if "session_id" in doc: + return doc["session_id"] + + return None diff --git a/jupyter_collaboration/rooms/base.py b/jupyter_collaboration/rooms/base.py index 15a16d1f..9d26657a 100644 --- a/jupyter_collaboration/rooms/base.py +++ b/jupyter_collaboration/rooms/base.py @@ -4,6 +4,7 @@ from __future__ import annotations import asyncio +import uuid from logging import Logger from ypy_websocket.websocket_server import YRoom @@ -15,6 +16,7 @@ class BaseRoom(YRoom): def __init__(self, room_id: str, store: BaseYStore | None = None, log: Logger | None = None): super().__init__(ready=False, ystore=store, log=log) self._room_id = room_id + self._session_id: str = str(uuid.uuid4()) @property def room_id(self) -> str: @@ -23,6 +25,18 @@ def room_id(self) -> str: """ return self._room_id + @property + def session_id(self) -> str: + """ + A unique identifier for the updates. + + NOTE: The session id, is a unique identifier for the updates + that compose the Y document. If this document is destroyed, every + client connected must replace its content with new updates otherwise + once we initialize a new Y document, the content will be duplicated. + """ + return self._session_id + async def initialize(self) -> None: return diff --git a/jupyter_collaboration/rooms/document.py b/jupyter_collaboration/rooms/document.py index fd9340d9..3798d3e8 100644 --- a/jupyter_collaboration/rooms/document.py +++ b/jupyter_collaboration/rooms/document.py @@ -87,6 +87,8 @@ async def initialize(self) -> None: # try to apply Y updates from the YStore for this document if self.ystore is not None and await self.ystore.exists(self._room_id): # Load the content from the store + doc = await self.ystore.get(self._room_id) + self._session_id = doc["session_id"] await self.ystore.apply_updates(self._room_id, self.ydoc) self._emit( LogLevel.INFO, @@ -114,14 +116,6 @@ async def initialize(self) -> None: # Update the content self._document.source = model["content"] - - doc = await self.ystore.get(self._room_id) - await self.ystore.remove(self._room_id) - version = 0 - if "version" in doc: - version = doc["version"] + 1 - - await self.ystore.create(self._room_id, version) await self.ystore.encode_state_as_update(self._room_id, self.ydoc) else: @@ -132,7 +126,7 @@ async def initialize(self) -> None: self._document.source = model["content"] if self.ystore is not None: - await self.ystore.create(self._room_id, 0) + await self.ystore.create(self._room_id, self.session_id) await self.ystore.encode_state_as_update(self._room_id, self.ydoc) self._last_modified = model["last_modified"] diff --git a/jupyter_collaboration/rooms/manager.py b/jupyter_collaboration/rooms/manager.py index c38e1498..fcf3a669 100644 --- a/jupyter_collaboration/rooms/manager.py +++ b/jupyter_collaboration/rooms/manager.py @@ -150,18 +150,23 @@ async def _clean_up_room(self, room_id: str, delay: float) -> None: self._emit(room_id, LogLevel.INFO, "clean", "Room deleted.") # Clean the file loader if there are not rooms using it - _, _, file_id = decode_file_path(room_id) - file = self._file_loaders[file_id] - if file.number_of_subscriptions == 0: - await self._file_loaders.remove(file_id) - self.log.info("Loader %s deleted", file.path) - self._emit(room_id, LogLevel.INFO, "clean", "Loader deleted.") + if room_id.count(":") >= 2: + _, _, file_id = decode_file_path(room_id) + file = self._file_loaders[file_id] + if file.number_of_subscriptions == 0: + await self._file_loaders.remove(file_id) + self.log.info("Loader %s deleted", file.path) + self._emit(room_id, LogLevel.INFO, "clean", "Loader deleted.") - del self._clean_up_tasks[room_id] + if room_id in self._clean_up_tasks: + del self._clean_up_tasks[room_id] def _emit( self, room_id: str, level: LogLevel, action: str | None = None, msg: str | None = None ) -> None: + if room_id.count(":") < 2: + return + _, _, file_id = decode_file_path(room_id) path = self._file_loaders.file_id_manager.get_path(file_id) diff --git a/jupyter_collaboration/stores/base_store.py b/jupyter_collaboration/stores/base_store.py index 0785b41d..b2403c14 100644 --- a/jupyter_collaboration/stores/base_store.py +++ b/jupyter_collaboration/stores/base_store.py @@ -72,13 +72,13 @@ async def get(self, path: str, updates: bool = False) -> dict | None: ... @abstractmethod - async def create(self, path: str, version: int) -> None: + async def create(self, path: str, session_id: str) -> None: """ Creates a new document. Arguments: path: The document name/path. - version: Document version. + session_id: A unique identifier for the updates. """ ... diff --git a/jupyter_collaboration/stores/file_store.py b/jupyter_collaboration/stores/file_store.py index 2ea2167a..a082daad 100644 --- a/jupyter_collaboration/stores/file_store.py +++ b/jupyter_collaboration/stores/file_store.py @@ -121,11 +121,11 @@ async def get(self, path: str, updates: bool = False) -> dict | None: if not await anyio.Path(file_path).exists(): return None else: - version = None + session_id = None async with await anyio.open_file(file_path, "rb") as f: header = await f.read(8) if header == b"VERSION:": - version = int(await f.readline()) + session_id = str(await f.readline()) list_updates: list[tuple[bytes, bytes, float]] = [] if updates: @@ -133,15 +133,15 @@ async def get(self, path: str, updates: bool = False) -> dict | None: async for update, metadata, timestamp in self._decode_data(data): list_updates.append((update, metadata, timestamp)) - return dict(path=path, version=version, updates=list_updates) + return dict(path=path, session_id=session_id, updates=list_updates) - async def create(self, path: str, version: int) -> None: + async def create(self, path: str, session_id: str) -> None: """ Creates a new document. Arguments: path: The document name/path. - version: Document version. + session_id: A unique identifier for the updates. """ if self._initialized is None: raise Exception("The store was not initialized.") @@ -154,7 +154,7 @@ async def create(self, path: str, version: int) -> None: else: await anyio.Path(file_path.parent).mkdir(parents=True, exist_ok=True) async with await anyio.open_file(file_path, "wb") as f: - version_bytes = f"VERSION:{version}\n".encode() + version_bytes = f"VERSION:{session_id}\n".encode() await f.write(version_bytes) async def remove(self, path: str) -> None: diff --git a/jupyter_collaboration/stores/sqlite_store.py b/jupyter_collaboration/stores/sqlite_store.py index 793d31d8..2413db74 100644 --- a/jupyter_collaboration/stores/sqlite_store.py +++ b/jupyter_collaboration/stores/sqlite_store.py @@ -81,7 +81,7 @@ async def initialize(self) -> None: # Make sure every table exists. async with aiosqlite.connect(self._store_path) as db: await db.execute( - "CREATE TABLE IF NOT EXISTS documents (path TEXT PRIMARY KEY, version INTEGER NOT NULL)" + "CREATE TABLE IF NOT EXISTS documents (path TEXT PRIMARY KEY, session TEXT NOT NULL)" ) await db.execute( "CREATE TABLE IF NOT EXISTS yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)" @@ -108,7 +108,7 @@ async def exists(self, path: str) -> bool: async with self._lock: async with aiosqlite.connect(self._store_path) as db: cursor = await db.execute( - "SELECT path, version FROM documents WHERE path = ?", + "SELECT path, session FROM documents WHERE path = ?", (path,), ) return (await cursor.fetchone()) is not None @@ -142,7 +142,7 @@ async def get(self, path: str, updates: bool = False) -> dict | None: async with self._lock: async with aiosqlite.connect(self._store_path) as db: cursor = await db.execute( - "SELECT path, version FROM documents WHERE path = ?", + "SELECT path, session FROM documents WHERE path = ?", (path,), ) doc = await cursor.fetchone() @@ -158,15 +158,15 @@ async def get(self, path: str, updates: bool = False) -> dict | None: ) list_updates = await cursor.fetchall() - return dict(path=doc[0], version=doc[1], updates=list_updates) + return dict(path=doc[0], session_id=doc[1], updates=list_updates) - async def create(self, path: str, version: int) -> None: + async def create(self, path: str, session_id: str) -> None: """ Creates a new document. Arguments: path: The document name/path. - version: Document version. + session_id: A unique identifier for the updates. """ if self._initialized is None: raise Exception("The store was not initialized.") @@ -177,7 +177,7 @@ async def create(self, path: str, version: int) -> None: async with aiosqlite.connect(self._store_path) as db: await db.execute( "INSERT INTO documents VALUES (?, ?)", - (path, version), + (path, session_id), ) await db.commit() except aiosqlite.IntegrityError: @@ -197,7 +197,7 @@ async def remove(self, path: str) -> None: async with self._lock: async with aiosqlite.connect(self._store_path) as db: cursor = await db.execute( - "SELECT path, version FROM documents WHERE path = ?", + "SELECT path, session FROM documents WHERE path = ?", (path,), ) if (await cursor.fetchone()) is None: diff --git a/packages/docprovider/src/yprovider.ts b/packages/docprovider/src/yprovider.ts index 566b3f0f..2035fc7f 100644 --- a/packages/docprovider/src/yprovider.ts +++ b/packages/docprovider/src/yprovider.ts @@ -101,13 +101,16 @@ export class WebSocketProvider implements IDocumentProvider { this._path ); + const params = + session.sessionId !== null ? { sessionId: session.sessionId } : undefined; + this._yWebsocketProvider = new YWebsocketProvider( this._serverUrl, `${session.format}:${session.type}:${session.fileId}`, this._sharedModel.ydoc, { disableBc: true, - params: { sessionId: session.sessionId }, + params, awareness: this._awareness } ); From d1352244d9f808dbe535d8a25ee54f77801708d2 Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Tue, 17 Oct 2023 12:37:13 +0200 Subject: [PATCH 2/3] Fix CI --- tests/conftest.py | 10 ++++++++-- tests/test_handlers.py | 8 ++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 3e9bf5ca..67845d32 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -133,8 +133,14 @@ def rtc_connect_doc_client(jp_http_port, jp_base_url, rtc_fetch_session): async def _inner(format: str, type: str, path: str) -> Any: resp = await rtc_fetch_session(format, type, path) data = json.loads(resp.body.decode("utf-8")) + if "sessionId" in data and data["sessionId"] is not None: + params = f"?sessionId={data['sessionId']}" + else: + params = "" + return connect( - f"ws://127.0.0.1:{jp_http_port}{jp_base_url}api/collaboration/room/{data['format']}:{data['type']}:{data['fileId']}?sessionId={data['sessionId']}" + f"ws://127.0.0.1:{jp_http_port}{jp_base_url}api/collaboration/room/{data['format']}:{data['type']}:{data['fileId']}" + + params ) return _inner @@ -181,7 +187,7 @@ async def _inner(type: str, path: str, content: str) -> DocumentRoom: doc.source = content - await db.create(path, 0) + await db.create(path, "0") await db.encode_state_as_update(path, doc.ydoc) return db diff --git a/tests/test_handlers.py b/tests/test_handlers.py index 0a8f7d08..996fd3b5 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -11,7 +11,7 @@ from ypy_websocket import WebsocketProvider -async def test_session_handler_should_create_session_id( +async def test_session_handler_should_create_a_session_without_session_id( rtc_create_file, rtc_fetch_session, jp_serverapp ): file_format = "text" @@ -28,10 +28,10 @@ async def test_session_handler_should_create_session_id( assert data["format"] == file_format assert data["type"] == file_type assert data["fileId"] == fim.get_id(file_path) - assert data["sessionId"] + assert data["sessionId"] is None -async def test_session_handler_should_respond_with_session_id( +async def test_session_handler_should_respond_without_session_id( rtc_create_file, rtc_fetch_session, jp_serverapp ): file_format = "text" @@ -49,7 +49,7 @@ async def test_session_handler_should_respond_with_session_id( assert data["format"] == file_format assert data["type"] == file_type assert data["fileId"] == fim.get_id(file_path) - assert data["sessionId"] + assert data["sessionId"] is None async def test_session_handler_should_respond_with_not_found(rtc_fetch_session): From 0c908c117e95d84d95d58d4b95e7a6b5bc311054 Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Tue, 17 Oct 2023 12:57:07 +0200 Subject: [PATCH 3/3] Pre-commit --- jupyter_collaboration/app.py | 2 +- jupyter_collaboration/handlers.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/jupyter_collaboration/app.py b/jupyter_collaboration/app.py index e7dc9bb7..7461b98a 100644 --- a/jupyter_collaboration/app.py +++ b/jupyter_collaboration/app.py @@ -32,7 +32,7 @@ class YDocExtension(ExtensionApp): ) document_cleanup_delay = Float( - 5, + 60, allow_none=True, config=True, help="""The delay in seconds to keep a document in memory in the back-end after all clients diff --git a/jupyter_collaboration/handlers.py b/jupyter_collaboration/handlers.py index cd5d3ee0..8dcaf34c 100644 --- a/jupyter_collaboration/handlers.py +++ b/jupyter_collaboration/handlers.py @@ -304,7 +304,7 @@ async def _get_session_id(self, room_id: str) -> str | None: if await self._store.exists(room_id): doc = await self._store.get(room_id) - if "session_id" in doc: + if doc is not None and "session_id" in doc: return doc["session_id"] return None