Skip to content

Commit

Permalink
Move the stores temporarily
Browse files Browse the repository at this point in the history
  • Loading branch information
hbcarlos committed Oct 10, 2023
1 parent 014fc40 commit 9227b37
Show file tree
Hide file tree
Showing 8 changed files with 762 additions and 10 deletions.
6 changes: 1 addition & 5 deletions jupyter_collaboration/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@

from jupyter_server.extension.application import ExtensionApp
from traitlets import Bool, Float, Type
from ypy_websocket.stores import BaseYStore

from .handlers import DocSessionHandler, YDocWebSocketHandler
from .loaders import FileLoaderMapping
from .stores import SQLiteYStore
from .stores import BaseYStore, SQLiteYStore
from .utils import EVENTS_SCHEMA_PATH
from .websocketserver import JupyterWebsocketServer

Expand Down Expand Up @@ -124,6 +123,3 @@ async def stop_extension(self):
],
timeout=3,
)

if self._store is not None and self._store.started.is_set():
self._store.stop()
5 changes: 2 additions & 3 deletions jupyter_collaboration/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
from jupyter_ydoc import ydocs as YDOCS
from tornado import web
from tornado.websocket import WebSocketHandler
from ypy_websocket.stores import BaseYStore
from ypy_websocket.websocket_server import YRoom
from ypy_websocket.yutils import YMessageType, write_var_uint

from .loaders import FileLoaderMapping
from .rooms import DocumentRoom, TransientRoom
from .stores import BaseYStore
from .utils import (
JUPYTER_COLLABORATION_EVENTS_URI,
LogLevel,
Expand Down Expand Up @@ -67,8 +67,7 @@ async def prepare(self):
# is done.
# We are temporarily initializing the store here because `start``
# is an async function
if self._store is not None and not self._store.started.is_set():
await self._store.start()
if self._store is not None and not self._store.initialized:
await self._store.initialize()

if not self._websocket_server.started.is_set():
Expand Down
3 changes: 3 additions & 0 deletions jupyter_collaboration/stores/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .base_store import BaseYStore # noqa
from .stores import SQLiteYStore, TempFileYStore # noqa
from .utils import YDocExists, YDocNotFound # noqa
154 changes: 154 additions & 0 deletions jupyter_collaboration/stores/base_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from inspect import isawaitable
from typing import AsyncIterator, Awaitable, Callable, cast

import y_py as Y
from anyio import Event


class BaseYStore(ABC):
"""
Base class for the stores.
"""

version = 3
metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None

_store_path: str
_initialized: Event | None = None

@abstractmethod
def __init__(
self, path: str, metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None
):
"""
Initialize the object.
Arguments:
path: The path where the store will be located.
metadata_callback: An optional callback to call to get the metadata.
log: An optional logger.
"""
...

@abstractmethod
async def initialize(self) -> None:
"""
Initializes the store.
"""
...

@abstractmethod
async def exists(self, path: str) -> bool:
"""
Returns True if the document exists, else returns False.
Arguments:
path: The document name/path.
"""
...

@abstractmethod
async def list(self) -> AsyncIterator[str]:
"""
Returns a list with the name/path of the documents stored.
"""
...

@abstractmethod
async def get(self, path: str, updates: bool = False) -> dict | None:
"""
Returns the document's metadata or None if the document does't exist.
Arguments:
path: The document name/path.
updates: Whether to return document's content or only the metadata.
"""
...

@abstractmethod
async def create(self, path: str, version: int) -> None:
"""
Creates a new document.
Arguments:
path: The document name/path.
version: Document version.
"""
...

@abstractmethod
async def remove(self, path: str) -> dict | None:
"""
Removes a document.
Arguments:
path: The document name/path.
"""
...

@abstractmethod
async def write(self, path: str, data: bytes) -> None:
"""
Store a document update.
Arguments:
path: The document name/path.
data: The update to store.
"""
...

@abstractmethod
async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes]]:
"""
Async iterator for reading document's updates.
Arguments:
path: The document name/path.
Returns:
A tuple of (update, metadata, timestamp) for each update.
"""
...

@property
def initialized(self) -> bool:
if self._initialized is not None:
return self._initialized.is_set()
return False

async def get_metadata(self) -> bytes:
"""
Returns:
The metadata.
"""
if self.metadata_callback is None:
return b""

metadata = self.metadata_callback()
if isawaitable(metadata):
metadata = await metadata
metadata = cast(bytes, metadata)
return metadata

async def encode_state_as_update(self, path: str, ydoc: Y.YDoc) -> None:
"""Store a YDoc state.
Arguments:
path: The document name/path.
ydoc: The YDoc from which to store the state.
"""
update = Y.encode_state_as_update(ydoc) # type: ignore
await self.write(path, update)

async def apply_updates(self, path: str, ydoc: Y.YDoc) -> None:
"""Apply all stored updates to the YDoc.
Arguments:
path: The document name/path.
ydoc: The YDoc on which to apply the updates.
"""
async for update, *rest in self.read(path): # type: ignore
Y.apply_update(ydoc, update) # type: ignore
Loading

0 comments on commit 9227b37

Please sign in to comment.