Skip to content

Commit

Permalink
Small improvements and documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
hbcarlos committed Oct 13, 2023
1 parent 2c07098 commit b98ac7c
Show file tree
Hide file tree
Showing 17 changed files with 119 additions and 152 deletions.
4 changes: 2 additions & 2 deletions docs/source/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ jupyter lab --YDocExtension.file_poll_interval=2
# If None, the document will be kept in memory forever.
jupyter lab --YDocExtension.document_cleanup_delay=100

# The YStore class to use for storing Y updates (default: JupyterSQLiteYStore).
jupyter lab --YDocExtension.ystore_class=ypy_websocket.ystore.TempFileYStore
# The Store class used for storing Y updates (default: SQLiteYStore).
jupyter lab --YDocExtension.ystore_class=jupyter_collaboration.stores.FileYStore
```
9 changes: 9 additions & 0 deletions docs/source/developer/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@

COMING...

### Opening a document
![initialization](../images/initialization_diagram.png)

### Autosave
![autosave](../images/autosave_diagram.png)

### Conflict
![autosave](../images/conflict_diagram.png)

## Early attempts

Prior to the current implementation based on [Yjs](https://docs.yjs.dev/), other attempts using
Expand Down
Binary file added docs/source/images/autosave_diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/source/images/conflict_diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/source/images/initialization_diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 1 addition & 8 deletions jupyter_collaboration/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from .rooms import RoomManager
from .stores import BaseYStore, SQLiteYStore
from .utils import EVENTS_SCHEMA_PATH
from .websocketserver import JupyterWebsocketServer


class YDocExtension(ExtensionApp):
Expand Down Expand Up @@ -80,14 +79,7 @@ def initialize_handlers(self):
for k, v in self.config.get(self.ystore_class.__name__, {}).items():
setattr(self.ystore_class, k, v)

# NOTE: Initialize in the ExtensionApp.start_extension once
# https://github.com/jupyter-server/jupyter_server/issues/1329
# is done.
# We are temporarily initializing the store here because the
# initialization is async
self.store = self.ystore_class(log=self.log)
loop = asyncio.get_event_loop()
loop.run_until_complete(self.store.initialize())

# self.settings is local to the ExtensionApp but here we need
# the global app settings in which the file id manager will register
Expand All @@ -111,6 +103,7 @@ def initialize_handlers(self):
YDocWebSocketHandler,
{
"document_cleanup_delay": self.document_cleanup_delay,
"store": self.store,
"room_manager": self.room_manager,
},
),
Expand Down
110 changes: 56 additions & 54 deletions jupyter_collaboration/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from tornado.websocket import WebSocketHandler
from ypy_websocket.yutils import write_var_uint

from .rooms import DocumentRoom, RoomManager
from .rooms import BaseRoom, RoomManager
from .stores import BaseYStore
from .utils import (
JUPYTER_COLLABORATION_EVENTS_URI,
LogLevel,
Expand Down Expand Up @@ -50,29 +51,38 @@ class YDocWebSocketHandler(WebSocketHandler, JupyterHandler):
receiving a message.
"""

_serve_task: asyncio.Task
_room_id: str
room: BaseRoom
_serve_task: asyncio.Task | None
_message_queue: asyncio.Queue[Any]

async def prepare(self):
# Get room
self._room_id: str = self.request.path.split("/")[-1]
self.room = await self._room_manager.get_room(self._room_id)
return await super().prepare()

def initialize(
self, room_manager: RoomManager, document_cleanup_delay: float | None = 60.0
self,
store: BaseYStore,
room_manager: RoomManager,
document_cleanup_delay: float | None = 60.0,
) -> None:
# File ID manager cannot be passed as argument as the extension may load after this one
self._file_id_manager = self.settings["file_id_manager"]

self._store = store
self._room_manager = room_manager
self._cleanup_delay = document_cleanup_delay

self._room_id = None
self.room = None
self._serve_task = None
self._serve_task: asyncio.Task | None = None
self._message_queue = asyncio.Queue()

async def prepare(self):
# NOTE: Initialize in the ExtensionApp.start_extension once
# https://github.com/jupyter-server/jupyter_server/issues/1329
# is done.
# We are temporarily initializing the store here because the
# initialization is async
if not self._store.initialized:
await self._store.initialize()

return await super().prepare()

@property
def path(self):
"""
Expand Down Expand Up @@ -112,47 +122,43 @@ async def open(self, room_id):
"""
On connection open.
"""
# Start processing messages in the room
self._serve_task = asyncio.create_task(self.room.serve(self))
self._room_id = self.request.path.split("/")[-1]

# Close the connection if the document session expired
session_id = self.get_query_argument("sessionId", None)
if session_id and SERVER_SESSION != session_id:
self.close(
1003,
f"Document session {session_id} expired. You need to reload this browser tab.",
)
try:
# Get room
self.room = await self._room_manager.get_room(self._room_id)

if isinstance(self.room, DocumentRoom):
# Close the connection if the document session expired
session_id = self.get_query_argument("sessionId", "")
if SERVER_SESSION != session_id:
self.close(
1003,
f"Document session {session_id} expired. You need to reload this browser tab.",
)

# TODO: Move initialization to RoomManager to make sure only one
# client calls initialize
try:
# Initialize the room
await self.room.initialize()
except Exception as e:
_, _, file_id = decode_file_path(self._room_id)
path = self._file_id_manager.get_path(file_id)

# Close websocket and propagate error.
if isinstance(e, web.HTTPError):
self.log.error(f"File {path} not found.\n{e!r}", exc_info=e)
self.close(1004, f"File {path} not found.")
else:
self.log.error(f"Error initializing: {path}\n{e!r}", exc_info=e)
self.close(1003, f"Error initializing: {path}. You need to close the document.")

# Clean up the room and delete the file loader
if (
self.room is not None
and len(self.room.clients) == 0
or self.room.clients == [self]
):
self._message_queue.put_nowait(b"")
except Exception as e:
_, _, file_id = decode_file_path(self._room_id)
path = self._file_id_manager.get_path(file_id)

# Close websocket and propagate error.
if isinstance(e, web.HTTPError):
self.log.error(f"File {path} not found.\n{e!r}", exc_info=e)
self.close(1004, f"File {path} not found.")
else:
self.log.error(f"Error initializing: {path}\n{e!r}", exc_info=e)
self.close(1003, f"Error initializing: {path}. You need to close the document.")

# Clean up the room and delete the file loader
if self.room is not None and len(self.room.clients) == 0 or self.room.clients == [self]:
self._message_queue.put_nowait(b"")
if self._serve_task:
self._serve_task.cancel()
await self._room_manager.remove(self._room_id)
await self._room_manager.remove_room(self._room_id)

self._emit(LogLevel.INFO, "initialize", "New client connected.")

# Start processing messages in the room
self._serve_task = asyncio.create_task(self.room.serve(self))

async def send(self, message):
"""
Send a message to the client.
Expand Down Expand Up @@ -201,15 +207,11 @@ def on_close(self) -> None:
if self._serve_task is not None and not self._serve_task.cancelled():
self._serve_task.cancel()

if (
self.room is not None
and isinstance(self.room, DocumentRoom)
and self.room.clients == [self]
):
if self.room is not None and self.room.clients == [self]:
# no client in this room after we disconnect
# Remove the room with a delay in case someone reconnects
IOLoop.current().add_callback(
self._room_manager.remove, self._room_id, self._cleanup_delay
self._room_manager.remove_room, self._room_id, self._cleanup_delay
)

def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None:
Expand Down
6 changes: 6 additions & 0 deletions jupyter_collaboration/rooms/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ def room_id(self) -> str:
"""
return self._room_id

async def initialize(self) -> None:
return

async def handle_msg(self, data: bytes) -> None:
return

def broadcast_msg(self, msg: bytes) -> None:
for client in self.clients:
self._task_group.start_soon(client.send, msg)
Expand Down
2 changes: 1 addition & 1 deletion jupyter_collaboration/rooms/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@

from jupyter_events import EventLogger
from jupyter_ydoc import ydocs as YDOCS
from ypy_websocket.stores import BaseYStore
from ypy_websocket.yutils import write_var_uint

from ..loaders import FileLoader
from ..stores import BaseYStore
from ..utils import (
JUPYTER_COLLABORATION_EVENTS_URI,
LogLevel,
Expand Down
11 changes: 8 additions & 3 deletions jupyter_collaboration/rooms/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def has_room(self, room_id: str) -> bool:
"""Test if an id has a room."""
return room_id in self._rooms

async def get_room(self, room_id: str) -> BaseRoom | None:
async def get_room(self, room_id: str) -> BaseRoom:
"""
Get the room for a given id.
Expand Down Expand Up @@ -75,9 +75,12 @@ async def get_room(self, room_id: str) -> BaseRoom | None:
if not room.started.is_set():
self._room_tasks[room_id] = asyncio.create_task(room.start())

if not room.ready:
await room.initialize()

return room

async def remove(self, room_id: str, delay: float = 0) -> None:
async def remove_room(self, room_id: str, delay: float = 0) -> None:
"""Remove the room for a given id."""
# Use lock to make sure while a client is creating the
# clean up task, no one else is accessing the room or trying to
Expand All @@ -86,6 +89,8 @@ async def remove(self, room_id: str, delay: float = 0) -> None:
if room_id in self._clean_up_tasks:
return

# NOTE: Should we check if there is only one client?
# if len(self._rooms[room_id].clients) <= 1:
self._clean_up_tasks[room_id] = asyncio.create_task(self._clean_up_room(room_id, delay))

async def clear(self) -> None:
Expand Down Expand Up @@ -118,7 +123,7 @@ def _create_document_room(self, room_id: str) -> DocumentRoom:
self._document_save_delay,
)

async def _clean_up_room(self, room_id: str, delay: float):
async def _clean_up_room(self, room_id: str, delay: float) -> None:
"""
Async task for cleaning up the resources.
Expand Down
2 changes: 2 additions & 0 deletions jupyter_collaboration/stores/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@
# Distributed under the terms of the Modified BSD License.

from .base_store import BaseYStore # noqa
from .file_store import FileYStore # noqa
from .sqlite_store import SQLiteYStore as _SQLiteYStore # noqa
from .stores import SQLiteYStore, TempFileYStore # noqa
from .utils import YDocExists, YDocNotFound # noqa
4 changes: 2 additions & 2 deletions jupyter_collaboration/stores/base_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async def encode_state_as_update(self, path: str, ydoc: Y.YDoc) -> None:
path: The document name/path.
ydoc: The YDoc from which to store the state.
"""
update = Y.encode_state_as_update(ydoc) # type: ignore
update = Y.encode_state_as_update(ydoc)
await self.write(path, update)

async def apply_updates(self, path: str, ydoc: Y.YDoc) -> None:
Expand All @@ -154,4 +154,4 @@ async def apply_updates(self, path: str, ydoc: Y.YDoc) -> None:
ydoc: The YDoc on which to apply the updates.
"""
async for update, *rest in self.read(path): # type: ignore
Y.apply_update(ydoc, update) # type: ignore
Y.apply_update(ydoc, update)
63 changes: 2 additions & 61 deletions jupyter_collaboration/stores/file_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
from __future__ import annotations

import struct
import tempfile
import time
from logging import Logger, getLogger
from pathlib import Path
from typing import AsyncIterator, Awaitable, Callable
from typing import Any, AsyncIterator, Awaitable, Callable

import anyio
from anyio import Event, Lock
from deprecated import deprecated
from ypy_websocket.yutils import Decoder, get_new_path, write_var_uint

from .base_store import BaseYStore
Expand Down Expand Up @@ -235,7 +233,7 @@ async def _get_data_offset(self, path: Path) -> int:
except Exception:
raise YDocNotFound(f"File {str(path)} not found.")

async def _decode_data(self, data) -> AsyncIterator[tuple[bytes, bytes, float]]:
async def _decode_data(self, data: Any) -> AsyncIterator[tuple[bytes, bytes, float]]:
i = 0
for d in Decoder(data).read_messages():
if i == 0:
Expand All @@ -249,60 +247,3 @@ async def _decode_data(self, data) -> AsyncIterator[tuple[bytes, bytes, float]]:

def _get_document_path(self, path: str) -> Path:
return Path(self._store_path, path + ".y")


@deprecated(reason="Use FileYStore instead")
class TempFileYStore(FileYStore):
"""
A YStore which uses the system's temporary directory.
Files are writen under a common directory.
To prefix the directory name (e.g. /tmp/my_prefix_b4whmm7y/):
```py
class PrefixTempFileYStore(TempFileYStore):
prefix_dir = "my_prefix_"
```
## Note:
This class is deprecated. Use FileYStore and pass the tmp folder
as path argument. For example:
```py
tmp_dir = tempfile.mkdtemp(prefix="prefix/directory/")
store = FileYStore(tmp_dir)
```
"""

prefix_dir: str | None = None
base_dir: str | None = None

def __init__(
self,
path: str,
metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None,
log: Logger | None = None,
):
"""Initialize the object.
Arguments:
path: The file path used to store the updates.
metadata_callback: An optional callback to call to get the metadata.
log: An optional logger.
"""
full_path = str(Path(self.get_base_dir()) / path)
super().__init__(full_path, metadata_callback=metadata_callback, log=log)

def get_base_dir(self) -> str:
"""Get the base directory where the update file is written.
Returns:
The base directory path.
"""
if self.base_dir is None:
self.make_directory()
assert self.base_dir is not None
return self.base_dir

def make_directory(self):
"""Create the base directory where the update file is written."""
type(self).base_dir = tempfile.mkdtemp(prefix=self.prefix_dir)
1 change: 1 addition & 0 deletions jupyter_collaboration/stores/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.


class YDocNotFound(Exception):
pass

Expand Down
Loading

0 comments on commit b98ac7c

Please sign in to comment.