Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improves document sessions #204

Merged
merged 3 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion jupyter_collaboration/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,14 @@ def initialize_handlers(self):
"room_manager": self.room_manager,
},
),
(r"/api/collaboration/session/(.*)", DocSessionHandler),
(
r"/api/collaboration/session/(.*)",
DocSessionHandler,
{
"store": self.store,
"room_manager": self.room_manager,
},
),
]
)

Expand Down
100 changes: 67 additions & 33 deletions jupyter_collaboration/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import asyncio
import json
import time
import uuid
from typing import Any

from jupyter_server.auth import authorized
Expand All @@ -28,8 +27,6 @@

YFILE = YDOCS["file"]

SERVER_SESSION = str(uuid.uuid4())


class YDocWebSocketHandler(WebSocketHandler, JupyterHandler):
"""`YDocWebSocketHandler` uses the singleton pattern for ``WebsocketServer``,
Expand Down Expand Up @@ -62,6 +59,7 @@ def initialize(
room_manager: RoomManager,
document_cleanup_delay: float | None = 60.0,
) -> None:
super().initialize()
# File ID manager cannot be passed as argument as the extension may load after this one
self._file_id_manager = self.settings["file_id_manager"]

Expand Down Expand Up @@ -124,13 +122,6 @@ async def open(self, room_id):
"""
self._room_id = self.request.path.split("/")[-1]

# Close the connection if the document session expired
session_id = self.get_query_argument("sessionId", None)
if session_id and SERVER_SESSION != session_id:
self.close(
1003,
f"Document session {session_id} expired. You need to reload this browser tab.",
)
try:
# Get room
self.room = await self._room_manager.get_room(self._room_id)
Expand All @@ -154,10 +145,22 @@ async def open(self, room_id):
self._serve_task.cancel()
await self._room_manager.remove_room(self._room_id)

self._emit(LogLevel.INFO, "initialize", "New client connected.")
return

# Close the connection if the document session expired
session_id = self.get_query_argument("sessionId", None)
if session_id and session_id != self.room.session_id:
self.log.error(
f"Client tried to connect to {self._room_id} with an expired session ID {session_id}."
)
self.close(
1003,
f"Document session {session_id} expired. You need to reload this browser tab.",
)

# Start processing messages in the room
self._serve_task = asyncio.create_task(self.room.serve(self))
self._emit(LogLevel.INFO, "initialize", "New client connected.")

async def send(self, message):
"""
Expand Down Expand Up @@ -207,14 +210,20 @@ def on_close(self) -> None:
if self._serve_task is not None and not self._serve_task.cancelled():
self._serve_task.cancel()

if self.room is not None and self.room.clients == [self]:
# no client in this room after we disconnect
# Remove the room with a delay in case someone reconnects
IOLoop.current().add_callback(
self._room_manager.remove_room, self._room_id, self._cleanup_delay
)
if self.room is not None:
# Remove it self from the list of clients
self.room.clients.remove(self)
if len(self.room.clients) == 0:
# no client in this room after we disconnect
# Remove the room with a delay in case someone reconnects
IOLoop.current().add_callback(
self._room_manager.remove_room, self._room_id, self._cleanup_delay
)

def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None:
if self._room_id.count(":") < 2:
return

_, _, file_id = decode_file_path(self._room_id)
path = self._file_id_manager.get_path(file_id)

Expand All @@ -240,6 +249,22 @@ class DocSessionHandler(APIHandler):

auth_resource = "contents"

def initialize(self, store: BaseYStore, room_manager: RoomManager) -> None:
super().initialize()
self._store = store
self._room_manager = room_manager

async def prepare(self):
# NOTE: Initialize in the ExtensionApp.start_extension once
# https://github.com/jupyter-server/jupyter_server/issues/1329
# is done.
# We are temporarily initializing the store here because the
# initialization is async
if not self._store.initialized:
await self._store.initialize()

return await super().prepare()

@web.authenticated
@authorized
async def put(self, path):
Expand All @@ -251,26 +276,35 @@ async def put(self, path):
content_type = body["type"]
file_id_manager = self.settings["file_id_manager"]

status = 200
idx = file_id_manager.get_id(path)
if idx is not None:
# index already exists
self.log.info("Request for Y document '%s' with room ID: %s", path, idx)
data = json.dumps(
{"format": format, "type": content_type, "fileId": idx, "sessionId": SERVER_SESSION}
)
self.set_status(200)
return self.finish(data)

# try indexing
idx = file_id_manager.index(path)
if idx is None:
# file does not exists
raise web.HTTPError(404, f"File {path!r} does not exist")
# try indexing
status = 201
idx = file_id_manager.index(path)
if idx is None:
# file does not exists
raise web.HTTPError(404, f"File {path!r} does not exist")

session_id = await self._get_session_id(f"{format}:{content_type}:{idx}")

# index successfully created
self.log.info("Request for Y document '%s' with room ID: %s", path, idx)
data = json.dumps(
{"format": format, "type": content_type, "fileId": idx, "sessionId": SERVER_SESSION}
{"format": format, "type": content_type, "fileId": idx, "sessionId": session_id}
)
self.set_status(201)
self.set_status(status)
return self.finish(data)

async def _get_session_id(self, room_id: str) -> str | None:
# If the room exists and it is ready, return the session_id from the room.
if self._room_manager.has_room(room_id):
room = await self._room_manager.get_room(room_id)
if room.ready:
return room.session_id

if await self._store.exists(room_id):
doc = await self._store.get(room_id)
if doc is not None and "session_id" in doc:
return doc["session_id"]

return None
14 changes: 14 additions & 0 deletions jupyter_collaboration/rooms/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

import asyncio
import uuid
from logging import Logger

from ypy_websocket.websocket_server import YRoom
Expand All @@ -15,6 +16,7 @@ class BaseRoom(YRoom):
def __init__(self, room_id: str, store: BaseYStore | None = None, log: Logger | None = None):
super().__init__(ready=False, ystore=store, log=log)
self._room_id = room_id
self._session_id: str = str(uuid.uuid4())

@property
def room_id(self) -> str:
Expand All @@ -23,6 +25,18 @@ def room_id(self) -> str:
"""
return self._room_id

@property
def session_id(self) -> str:
"""
A unique identifier for the updates.

NOTE: The session id, is a unique identifier for the updates
that compose the Y document. If this document is destroyed, every
client connected must replace its content with new updates otherwise
once we initialize a new Y document, the content will be duplicated.
"""
return self._session_id

async def initialize(self) -> None:
return

Expand Down
12 changes: 3 additions & 9 deletions jupyter_collaboration/rooms/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ async def initialize(self) -> None:
# try to apply Y updates from the YStore for this document
if self.ystore is not None and await self.ystore.exists(self._room_id):
# Load the content from the store
doc = await self.ystore.get(self._room_id)
self._session_id = doc["session_id"]
await self.ystore.apply_updates(self._room_id, self.ydoc)
self._emit(
LogLevel.INFO,
Expand Down Expand Up @@ -114,14 +116,6 @@ async def initialize(self) -> None:

# Update the content
self._document.source = model["content"]

doc = await self.ystore.get(self._room_id)
await self.ystore.remove(self._room_id)
version = 0
if "version" in doc:
version = doc["version"] + 1

await self.ystore.create(self._room_id, version)
await self.ystore.encode_state_as_update(self._room_id, self.ydoc)

else:
Expand All @@ -132,7 +126,7 @@ async def initialize(self) -> None:
self._document.source = model["content"]

if self.ystore is not None:
await self.ystore.create(self._room_id, 0)
await self.ystore.create(self._room_id, self.session_id)
await self.ystore.encode_state_as_update(self._room_id, self.ydoc)

self._last_modified = model["last_modified"]
Expand Down
19 changes: 12 additions & 7 deletions jupyter_collaboration/rooms/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,23 @@ async def _clean_up_room(self, room_id: str, delay: float) -> None:
self._emit(room_id, LogLevel.INFO, "clean", "Room deleted.")

# Clean the file loader if there are not rooms using it
_, _, file_id = decode_file_path(room_id)
file = self._file_loaders[file_id]
if file.number_of_subscriptions == 0:
await self._file_loaders.remove(file_id)
self.log.info("Loader %s deleted", file.path)
self._emit(room_id, LogLevel.INFO, "clean", "Loader deleted.")
if room_id.count(":") >= 2:
_, _, file_id = decode_file_path(room_id)
file = self._file_loaders[file_id]
if file.number_of_subscriptions == 0:
await self._file_loaders.remove(file_id)
self.log.info("Loader %s deleted", file.path)
self._emit(room_id, LogLevel.INFO, "clean", "Loader deleted.")

del self._clean_up_tasks[room_id]
if room_id in self._clean_up_tasks:
del self._clean_up_tasks[room_id]

def _emit(
self, room_id: str, level: LogLevel, action: str | None = None, msg: str | None = None
) -> None:
if room_id.count(":") < 2:
return

_, _, file_id = decode_file_path(room_id)
path = self._file_loaders.file_id_manager.get_path(file_id)

Expand Down
4 changes: 2 additions & 2 deletions jupyter_collaboration/stores/base_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ async def get(self, path: str, updates: bool = False) -> dict | None:
...

@abstractmethod
async def create(self, path: str, version: int) -> None:
async def create(self, path: str, session_id: str) -> None:
"""
Creates a new document.

Arguments:
path: The document name/path.
version: Document version.
session_id: A unique identifier for the updates.
"""
...

Expand Down
12 changes: 6 additions & 6 deletions jupyter_collaboration/stores/file_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,27 +121,27 @@ async def get(self, path: str, updates: bool = False) -> dict | None:
if not await anyio.Path(file_path).exists():
return None
else:
version = None
session_id = None
async with await anyio.open_file(file_path, "rb") as f:
header = await f.read(8)
if header == b"VERSION:":
version = int(await f.readline())
session_id = str(await f.readline())

list_updates: list[tuple[bytes, bytes, float]] = []
if updates:
data = await f.read()
async for update, metadata, timestamp in self._decode_data(data):
list_updates.append((update, metadata, timestamp))

return dict(path=path, version=version, updates=list_updates)
return dict(path=path, session_id=session_id, updates=list_updates)

async def create(self, path: str, version: int) -> None:
async def create(self, path: str, session_id: str) -> None:
"""
Creates a new document.

Arguments:
path: The document name/path.
version: Document version.
session_id: A unique identifier for the updates.
"""
if self._initialized is None:
raise Exception("The store was not initialized.")
Expand All @@ -154,7 +154,7 @@ async def create(self, path: str, version: int) -> None:
else:
await anyio.Path(file_path.parent).mkdir(parents=True, exist_ok=True)
async with await anyio.open_file(file_path, "wb") as f:
version_bytes = f"VERSION:{version}\n".encode()
version_bytes = f"VERSION:{session_id}\n".encode()
await f.write(version_bytes)

async def remove(self, path: str) -> None:
Expand Down
Loading
Loading