Skip to content

Commit

Permalink
Migrate to latest stores (#200)
Browse files Browse the repository at this point in the history
* Migrate to latest stores

* Move the stores temporarily

* Automatic application of license header

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
hbcarlos and github-actions[bot] authored Oct 10, 2023
1 parent 9365912 commit 8879800
Show file tree
Hide file tree
Showing 11 changed files with 839 additions and 48 deletions.
11 changes: 7 additions & 4 deletions jupyter_collaboration/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@

from jupyter_server.extension.application import ExtensionApp
from traitlets import Bool, Float, Type
from ypy_websocket.ystore import BaseYStore

from .handlers import DocSessionHandler, YDocWebSocketHandler
from .loaders import FileLoaderMapping
from .stores import SQLiteYStore
from .stores import BaseYStore, SQLiteYStore
from .utils import EVENTS_SCHEMA_PATH
from .websocketserver import JupyterWebsocketServer

Expand All @@ -22,6 +21,8 @@ class YDocExtension(ExtensionApp):
Enables Real Time Collaboration in JupyterLab
"""

_store: BaseYStore = None

disable_rtc = Bool(False, config=True, help="Whether to disable real time collaboration.")

file_poll_interval = Float(
Expand Down Expand Up @@ -80,10 +81,12 @@ def initialize_handlers(self):
for k, v in self.config.get(self.ystore_class.__name__, {}).items():
setattr(self.ystore_class, k, v)

# Instantiate the store
self._store = self.ystore_class(log=self.log)

self.ywebsocket_server = JupyterWebsocketServer(
rooms_ready=False,
auto_clean_rooms=False,
ystore_class=self.ystore_class,
log=self.log,
)

Expand All @@ -103,7 +106,7 @@ def initialize_handlers(self):
"document_cleanup_delay": self.document_cleanup_delay,
"document_save_delay": self.document_save_delay,
"file_loaders": self.file_loaders,
"ystore_class": self.ystore_class,
"store": self._store,
"ywebsocket_server": self.ywebsocket_server,
},
),
Expand Down
18 changes: 12 additions & 6 deletions jupyter_collaboration/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
from tornado import web
from tornado.websocket import WebSocketHandler
from ypy_websocket.websocket_server import YRoom
from ypy_websocket.ystore import BaseYStore
from ypy_websocket.yutils import YMessageType, write_var_uint

from .loaders import FileLoaderMapping
from .rooms import DocumentRoom, TransientRoom
from .stores import BaseYStore
from .utils import (
JUPYTER_COLLABORATION_EVENTS_URI,
LogLevel,
Expand Down Expand Up @@ -62,6 +62,14 @@ def create_task(self, aw):
task.add_done_callback(self._background_tasks.discard)

async def prepare(self):
# NOTE: Initialize in the ExtensionApp.start_extension once
# https://github.com/jupyter-server/jupyter_server/issues/1329
# is done.
# We are temporarily initializing the store here because `start``
# is an async function
if self._store is not None and not self._store.initialized:
await self._store.initialize()

if not self._websocket_server.started.is_set():
self.create_task(self._websocket_server.start())
await self._websocket_server.started.wait()
Expand All @@ -84,15 +92,13 @@ async def prepare(self):
)

file = self._file_loaders[file_id]
updates_file_path = f".{file_type}:{file_id}.y"
ystore = self._ystore_class(path=updates_file_path, log=self.log)
self.room = DocumentRoom(
self._room_id,
file_format,
file_type,
file,
self.event_logger,
ystore,
self._store,
self.log,
self._document_save_delay,
)
Expand All @@ -111,15 +117,15 @@ def initialize(
self,
ywebsocket_server: JupyterWebsocketServer,
file_loaders: FileLoaderMapping,
ystore_class: type[BaseYStore],
store: BaseYStore,
document_cleanup_delay: float | None = 60.0,
document_save_delay: float | None = 1.0,
) -> None:
self._background_tasks = set()
# File ID manager cannot be passed as argument as the extension may load after this one
self._file_id_manager = self.settings["file_id_manager"]
self._file_loaders = file_loaders
self._ystore_class = ystore_class
self._store = store
self._cleanup_delay = document_cleanup_delay
self._document_save_delay = document_save_delay
self._websocket_server = ywebsocket_server
Expand Down
57 changes: 29 additions & 28 deletions jupyter_collaboration/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

from jupyter_events import EventLogger
from jupyter_ydoc import ydocs as YDOCS
from ypy_websocket.stores import BaseYStore
from ypy_websocket.websocket_server import YRoom
from ypy_websocket.ystore import BaseYStore, YDocNotFound
from ypy_websocket.yutils import write_var_uint

from .loaders import FileLoader
Expand Down Expand Up @@ -104,36 +104,28 @@ async def initialize(self) -> None:
return

self.log.info("Initializing room %s", self._room_id)

model = await self._file.load_content(self._file_format, self._file_type, True)

async with self._update_lock:
# try to apply Y updates from the YStore for this document
read_from_source = True
if self.ystore is not None:
try:
await self.ystore.apply_updates(self.ydoc)
self._emit(
LogLevel.INFO,
"load",
"Content loaded from the store {}".format(
self.ystore.__class__.__qualname__
),
)
self.log.info(
"Content in room %s loaded from the ystore %s",
self._room_id,
self.ystore.__class__.__name__,
)
read_from_source = False
except YDocNotFound:
# YDoc not found in the YStore, create the document from the source file (no change history)
pass
if self.ystore is not None and await self.ystore.exists(self._room_id):
# Load the content from the store
await self.ystore.apply_updates(self._room_id, self.ydoc)
self._emit(
LogLevel.INFO,
"load",
"Content loaded from the store {}".format(
self.ystore.__class__.__qualname__
),
)
self.log.info(
"Content in room %s loaded from the ystore %s",
self._room_id,
self.ystore.__class__.__name__,
)

if not read_from_source:
# if YStore updates and source file are out-of-sync, resync updates with source
if self._document.source != model["content"]:
# TODO: Delete document from the store.
self._emit(
LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore."
)
Expand All @@ -142,17 +134,26 @@ async def initialize(self) -> None:
self._file.path,
self.ystore.__class__.__name__,
)
read_from_source = True

if read_from_source:
doc = await self.ystore.get(self._room_id)
await self.ystore.remove(self._room_id)
version = 0
if "version" in doc:
version = doc["version"] + 1

await self.ystore.create(self._room_id, version)
await self.ystore.encode_state_as_update(self._room_id, self.ydoc)

else:
self._emit(LogLevel.INFO, "load", "Content loaded from disk.")
self.log.info(
"Content in room %s loaded from file %s", self._room_id, self._file.path
)
self._document.source = model["content"]

if self.ystore:
await self.ystore.encode_state_as_update(self.ydoc)
if self.ystore is not None:
await self.ystore.create(self._room_id, 0)
await self.ystore.encode_state_as_update(self._room_id, self.ydoc)

self._last_modified = model["last_modified"]
self._document.dirty = False
Expand Down
6 changes: 6 additions & 0 deletions jupyter_collaboration/stores/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

from .base_store import BaseYStore # noqa
from .stores import SQLiteYStore, TempFileYStore # noqa
from .utils import YDocExists, YDocNotFound # noqa
157 changes: 157 additions & 0 deletions jupyter_collaboration/stores/base_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

from __future__ import annotations

from abc import ABC, abstractmethod
from inspect import isawaitable
from typing import AsyncIterator, Awaitable, Callable, cast

import y_py as Y
from anyio import Event


class BaseYStore(ABC):
"""
Base class for the stores.
"""

version = 3
metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None

_store_path: str
_initialized: Event | None = None

@abstractmethod
def __init__(
self, path: str, metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None
):
"""
Initialize the object.
Arguments:
path: The path where the store will be located.
metadata_callback: An optional callback to call to get the metadata.
log: An optional logger.
"""
...

@abstractmethod
async def initialize(self) -> None:
"""
Initializes the store.
"""
...

@abstractmethod
async def exists(self, path: str) -> bool:
"""
Returns True if the document exists, else returns False.
Arguments:
path: The document name/path.
"""
...

@abstractmethod
async def list(self) -> AsyncIterator[str]:
"""
Returns a list with the name/path of the documents stored.
"""
...

@abstractmethod
async def get(self, path: str, updates: bool = False) -> dict | None:
"""
Returns the document's metadata or None if the document does't exist.
Arguments:
path: The document name/path.
updates: Whether to return document's content or only the metadata.
"""
...

@abstractmethod
async def create(self, path: str, version: int) -> None:
"""
Creates a new document.
Arguments:
path: The document name/path.
version: Document version.
"""
...

@abstractmethod
async def remove(self, path: str) -> dict | None:
"""
Removes a document.
Arguments:
path: The document name/path.
"""
...

@abstractmethod
async def write(self, path: str, data: bytes) -> None:
"""
Store a document update.
Arguments:
path: The document name/path.
data: The update to store.
"""
...

@abstractmethod
async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes]]:
"""
Async iterator for reading document's updates.
Arguments:
path: The document name/path.
Returns:
A tuple of (update, metadata, timestamp) for each update.
"""
...

@property
def initialized(self) -> bool:
if self._initialized is not None:
return self._initialized.is_set()
return False

async def get_metadata(self) -> bytes:
"""
Returns:
The metadata.
"""
if self.metadata_callback is None:
return b""

metadata = self.metadata_callback()
if isawaitable(metadata):
metadata = await metadata
metadata = cast(bytes, metadata)
return metadata

async def encode_state_as_update(self, path: str, ydoc: Y.YDoc) -> None:
"""Store a YDoc state.
Arguments:
path: The document name/path.
ydoc: The YDoc from which to store the state.
"""
update = Y.encode_state_as_update(ydoc) # type: ignore
await self.write(path, update)

async def apply_updates(self, path: str, ydoc: Y.YDoc) -> None:
"""Apply all stored updates to the YDoc.
Arguments:
path: The document name/path.
ydoc: The YDoc on which to apply the updates.
"""
async for update, *rest in self.read(path): # type: ignore
Y.apply_update(ydoc, update) # type: ignore
Loading

0 comments on commit 8879800

Please sign in to comment.