From 9227b3776d2c5d9a2d9c34cb42af25ccbf82d57e Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Tue, 10 Oct 2023 16:44:09 +0200 Subject: [PATCH] Move the stores temporarily --- jupyter_collaboration/app.py | 6 +- jupyter_collaboration/handlers.py | 5 +- jupyter_collaboration/stores/__init__.py | 3 + jupyter_collaboration/stores/base_store.py | 154 ++++++++++ jupyter_collaboration/stores/file_store.py | 305 +++++++++++++++++++ jupyter_collaboration/stores/sqlite_store.py | 288 +++++++++++++++++ jupyter_collaboration/{ => stores}/stores.py | 5 +- jupyter_collaboration/stores/utils.py | 6 + 8 files changed, 762 insertions(+), 10 deletions(-) create mode 100644 jupyter_collaboration/stores/__init__.py create mode 100644 jupyter_collaboration/stores/base_store.py create mode 100644 jupyter_collaboration/stores/file_store.py create mode 100644 jupyter_collaboration/stores/sqlite_store.py rename jupyter_collaboration/{ => stores}/stores.py (91%) create mode 100644 jupyter_collaboration/stores/utils.py diff --git a/jupyter_collaboration/app.py b/jupyter_collaboration/app.py index 62561201..37dd927e 100644 --- a/jupyter_collaboration/app.py +++ b/jupyter_collaboration/app.py @@ -6,11 +6,10 @@ from jupyter_server.extension.application import ExtensionApp from traitlets import Bool, Float, Type -from ypy_websocket.stores import BaseYStore from .handlers import DocSessionHandler, YDocWebSocketHandler from .loaders import FileLoaderMapping -from .stores import SQLiteYStore +from .stores import BaseYStore, SQLiteYStore from .utils import EVENTS_SCHEMA_PATH from .websocketserver import JupyterWebsocketServer @@ -124,6 +123,3 @@ async def stop_extension(self): ], timeout=3, ) - - if self._store is not None and self._store.started.is_set(): - self._store.stop() diff --git a/jupyter_collaboration/handlers.py b/jupyter_collaboration/handlers.py index afd5466c..0bbb985a 100644 --- a/jupyter_collaboration/handlers.py +++ b/jupyter_collaboration/handlers.py @@ -14,12 +14,12 @@ from jupyter_ydoc import ydocs as YDOCS from tornado import web from tornado.websocket import WebSocketHandler -from ypy_websocket.stores import BaseYStore from ypy_websocket.websocket_server import YRoom from ypy_websocket.yutils import YMessageType, write_var_uint from .loaders import FileLoaderMapping from .rooms import DocumentRoom, TransientRoom +from .stores import BaseYStore from .utils import ( JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, @@ -67,8 +67,7 @@ async def prepare(self): # is done. # We are temporarily initializing the store here because `start`` # is an async function - if self._store is not None and not self._store.started.is_set(): - await self._store.start() + if self._store is not None and not self._store.initialized: await self._store.initialize() if not self._websocket_server.started.is_set(): diff --git a/jupyter_collaboration/stores/__init__.py b/jupyter_collaboration/stores/__init__.py new file mode 100644 index 00000000..eac86205 --- /dev/null +++ b/jupyter_collaboration/stores/__init__.py @@ -0,0 +1,3 @@ +from .base_store import BaseYStore # noqa +from .stores import SQLiteYStore, TempFileYStore # noqa +from .utils import YDocExists, YDocNotFound # noqa diff --git a/jupyter_collaboration/stores/base_store.py b/jupyter_collaboration/stores/base_store.py new file mode 100644 index 00000000..79431565 --- /dev/null +++ b/jupyter_collaboration/stores/base_store.py @@ -0,0 +1,154 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from inspect import isawaitable +from typing import AsyncIterator, Awaitable, Callable, cast + +import y_py as Y +from anyio import Event + + +class BaseYStore(ABC): + """ + Base class for the stores. + """ + + version = 3 + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None + + _store_path: str + _initialized: Event | None = None + + @abstractmethod + def __init__( + self, path: str, metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None + ): + """ + Initialize the object. + + Arguments: + path: The path where the store will be located. + metadata_callback: An optional callback to call to get the metadata. + log: An optional logger. + """ + ... + + @abstractmethod + async def initialize(self) -> None: + """ + Initializes the store. + """ + ... + + @abstractmethod + async def exists(self, path: str) -> bool: + """ + Returns True if the document exists, else returns False. + + Arguments: + path: The document name/path. + """ + ... + + @abstractmethod + async def list(self) -> AsyncIterator[str]: + """ + Returns a list with the name/path of the documents stored. + """ + ... + + @abstractmethod + async def get(self, path: str, updates: bool = False) -> dict | None: + """ + Returns the document's metadata or None if the document does't exist. + + Arguments: + path: The document name/path. + updates: Whether to return document's content or only the metadata. + """ + ... + + @abstractmethod + async def create(self, path: str, version: int) -> None: + """ + Creates a new document. + + Arguments: + path: The document name/path. + version: Document version. + """ + ... + + @abstractmethod + async def remove(self, path: str) -> dict | None: + """ + Removes a document. + + Arguments: + path: The document name/path. + """ + ... + + @abstractmethod + async def write(self, path: str, data: bytes) -> None: + """ + Store a document update. + + Arguments: + path: The document name/path. + data: The update to store. + """ + ... + + @abstractmethod + async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes]]: + """ + Async iterator for reading document's updates. + + Arguments: + path: The document name/path. + + Returns: + A tuple of (update, metadata, timestamp) for each update. + """ + ... + + @property + def initialized(self) -> bool: + if self._initialized is not None: + return self._initialized.is_set() + return False + + async def get_metadata(self) -> bytes: + """ + Returns: + The metadata. + """ + if self.metadata_callback is None: + return b"" + + metadata = self.metadata_callback() + if isawaitable(metadata): + metadata = await metadata + metadata = cast(bytes, metadata) + return metadata + + async def encode_state_as_update(self, path: str, ydoc: Y.YDoc) -> None: + """Store a YDoc state. + + Arguments: + path: The document name/path. + ydoc: The YDoc from which to store the state. + """ + update = Y.encode_state_as_update(ydoc) # type: ignore + await self.write(path, update) + + async def apply_updates(self, path: str, ydoc: Y.YDoc) -> None: + """Apply all stored updates to the YDoc. + + Arguments: + path: The document name/path. + ydoc: The YDoc on which to apply the updates. + """ + async for update, *rest in self.read(path): # type: ignore + Y.apply_update(ydoc, update) # type: ignore diff --git a/jupyter_collaboration/stores/file_store.py b/jupyter_collaboration/stores/file_store.py new file mode 100644 index 00000000..e7beee00 --- /dev/null +++ b/jupyter_collaboration/stores/file_store.py @@ -0,0 +1,305 @@ +from __future__ import annotations + +import struct +import tempfile +import time +from logging import Logger, getLogger +from pathlib import Path +from typing import AsyncIterator, Awaitable, Callable + +import anyio +from anyio import Event, Lock +from deprecated import deprecated +from ypy_websocket.yutils import Decoder, get_new_path, write_var_uint + +from .base_store import BaseYStore +from .utils import YDocExists, YDocNotFound + + +class FileYStore(BaseYStore): + """A YStore which uses one file per document.""" + + _lock: Lock + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None + + def __init__( + self, + path: str = "./ystore", + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None, + log: Logger | None = None, + ) -> None: + """Initialize the object. + + Arguments: + path: The file path used to store the updates. + metadata_callback: An optional callback to call to get the metadata. + log: An optional logger. + """ + self._lock = Lock() + self._store_path = path + self.metadata_callback = metadata_callback + self.log = log or getLogger(__name__) + + async def initialize(self) -> None: + """ + Initializes the store. + """ + if self.initialized or self._initialized is not None: + return + self._initialized = Event() + + version_path = Path(self._store_path, "__version__") + if not await anyio.Path(self._store_path).exists(): + await anyio.Path(self._store_path).mkdir(parents=True, exist_ok=True) + + version = -1 + create_version = False + if await anyio.Path(version_path).exists(): + async with await anyio.open_file(version_path, "rb") as f: + version = int(await f.readline()) + + # Store version mismatch. Move store and create a new one. + if self.version != version: + create_version = True + + if create_version: + new_path = await get_new_path(self._store_path) + self.log.warning( + f"YStore version mismatch, moving {self._store_path} to {new_path}" + ) + await anyio.Path(self._store_path).rename(new_path) + await anyio.Path(self._store_path).mkdir(parents=True, exist_ok=True) + + else: + create_version = True + + if create_version: + async with await anyio.open_file(version_path, "wb") as f: + version_bytes = str(self.version).encode() + await f.write(version_bytes) + + self._initialized.set() + + async def exists(self, path: str) -> bool: + """ + Returns True if the document exists, else returns False. + + Arguments: + path: The document name/path. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + return await anyio.Path(self._get_document_path(path)).exists() + + async def list(self) -> AsyncIterator[str]: # type: ignore[override] + """ + Returns a list with the name/path of the documents stored. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async for child in anyio.Path(self._store_path).glob("**/*.y"): + yield child.relative_to(self._store_path).with_suffix("").as_posix() + + async def get(self, path: str, updates: bool = False) -> dict | None: + """ + Returns the document's metadata and updates or None if the document does't exist. + + Arguments: + path: The document name/path. + updates: Whether to return document's content or only the metadata. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + file_path = self._get_document_path(path) + if not await anyio.Path(file_path).exists(): + return None + else: + version = None + async with await anyio.open_file(file_path, "rb") as f: + header = await f.read(8) + if header == b"VERSION:": + version = int(await f.readline()) + + list_updates: list[tuple[bytes, bytes, float]] = [] + if updates: + data = await f.read() + async for update, metadata, timestamp in self._decode_data(data): + list_updates.append((update, metadata, timestamp)) + + return dict(path=path, version=version, updates=list_updates) + + async def create(self, path: str, version: int) -> None: + """ + Creates a new document. + + Arguments: + path: The document name/path. + version: Document version. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + file_path = self._get_document_path(path) + if await anyio.Path(file_path).exists(): + raise YDocExists(f"The document {path} already exists.") + + else: + await anyio.Path(file_path.parent).mkdir(parents=True, exist_ok=True) + async with await anyio.open_file(file_path, "wb") as f: + version_bytes = f"VERSION:{version}\n".encode() + await f.write(version_bytes) + + async def remove(self, path: str) -> None: + """ + Removes a document. + + Arguments: + path: The document name/path. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + file_path = self._get_document_path(path) + if await anyio.Path(file_path).exists(): + await anyio.Path(file_path).unlink(missing_ok=False) + else: + raise YDocNotFound(f"The document {path} doesn't exists.") + + async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: ignore + """Async iterator for reading the store content. + + Returns: + A tuple of (update, metadata, timestamp) for each update. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + file_path = self._get_document_path(path) + if not await anyio.Path(file_path).exists(): + raise YDocNotFound + + offset = await self._get_data_offset(file_path) + async with await anyio.open_file(file_path, "rb") as f: + await f.seek(offset) + data = await f.read() + if not data: + raise YDocNotFound + + async for res in self._decode_data(data): + yield res + + async def write(self, path: str, data: bytes) -> None: + """Store an update. + + Arguments: + data: The update to store. + """ + async with self._lock: + file_path = self._get_document_path(path) + if not await anyio.Path(file_path).exists(): + raise YDocNotFound + + async with await anyio.open_file(file_path, "ab") as f: + data_len = write_var_uint(len(data)) + await f.write(data_len + data) + metadata = await self.get_metadata() + metadata_len = write_var_uint(len(metadata)) + await f.write(metadata_len + metadata) + timestamp = struct.pack(" int: + try: + async with await anyio.open_file(path, "rb") as f: + header = await f.read(8) + if header == b"VERSION:": + await f.readline() + return await f.tell() + else: + raise Exception + + except Exception: + raise YDocNotFound(f"File {str(path)} not found.") + + async def _decode_data(self, data) -> AsyncIterator[tuple[bytes, bytes, float]]: + i = 0 + for d in Decoder(data).read_messages(): + if i == 0: + update = d + elif i == 1: + metadata = d + else: + timestamp = struct.unpack(" Path: + return Path(self._store_path, path + ".y") + + +@deprecated(reason="Use FileYStore instead") +class TempFileYStore(FileYStore): + """ + A YStore which uses the system's temporary directory. + Files are writen under a common directory. + To prefix the directory name (e.g. /tmp/my_prefix_b4whmm7y/): + + ```py + class PrefixTempFileYStore(TempFileYStore): + prefix_dir = "my_prefix_" + ``` + + ## Note: + This class is deprecated. Use FileYStore and pass the tmp folder + as path argument. For example: + + ```py + tmp_dir = tempfile.mkdtemp(prefix="prefix/directory/") + store = FileYStore(tmp_dir) + ``` + """ + + prefix_dir: str | None = None + base_dir: str | None = None + + def __init__( + self, + path: str, + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None, + log: Logger | None = None, + ): + """Initialize the object. + + Arguments: + path: The file path used to store the updates. + metadata_callback: An optional callback to call to get the metadata. + log: An optional logger. + """ + full_path = str(Path(self.get_base_dir()) / path) + super().__init__(full_path, metadata_callback=metadata_callback, log=log) + + def get_base_dir(self) -> str: + """Get the base directory where the update file is written. + + Returns: + The base directory path. + """ + if self.base_dir is None: + self.make_directory() + assert self.base_dir is not None + return self.base_dir + + def make_directory(self): + """Create the base directory where the update file is written.""" + type(self).base_dir = tempfile.mkdtemp(prefix=self.prefix_dir) diff --git a/jupyter_collaboration/stores/sqlite_store.py b/jupyter_collaboration/stores/sqlite_store.py new file mode 100644 index 00000000..196d8c88 --- /dev/null +++ b/jupyter_collaboration/stores/sqlite_store.py @@ -0,0 +1,288 @@ +from __future__ import annotations + +import time +from logging import Logger, getLogger +from typing import Any, AsyncIterator, Awaitable, Callable, Iterable + +import aiosqlite +import anyio +import y_py as Y +from anyio import Event, Lock +from ypy_websocket.yutils import get_new_path + +from .base_store import BaseYStore +from .utils import YDocExists, YDocNotFound + + +class SQLiteYStore(BaseYStore): + """A YStore which uses an SQLite database. + Unlike file-based YStores, the Y updates of all documents are stored in the same database. + + Subclass to point to your database file: + + ```py + class MySQLiteYStore(SQLiteYStore): + _store_path = "path/to/my_ystore.db" + ``` + """ + + _lock: Lock + # Determines the "time to live" for all documents, i.e. how recent the + # latest update of a document must be before purging document history. + # Defaults to never purging document history (None). + document_ttl: int | None = None + + def __init__( + self, + path: str = "./ystore.db", + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None, + log: Logger | None = None, + ) -> None: + """Initialize the object. + + Arguments: + path: The database path used to store the updates. + metadata_callback: An optional callback to call to get the metadata. + log: An optional logger. + """ + self._lock = Lock() + self._store_path = path + self.metadata_callback = metadata_callback + self.log = log or getLogger(__name__) + + async def initialize(self) -> None: + """ + Initializes the store. + """ + if self.initialized or self._initialized is not None: + return + self._initialized = Event() + + async with self._lock: + if await anyio.Path(self._store_path).exists(): + version = -1 + async with aiosqlite.connect(self._store_path) as db: + cursor = await db.execute("pragma user_version") + row = await cursor.fetchone() + if row is not None: + version = row[0] + + # The DB has an old version. Move the database. + if self.version != version: + new_path = await get_new_path(self._store_path) + self.log.warning( + f"YStore version mismatch, moving {self._store_path} to {new_path}" + ) + await anyio.Path(self._store_path).rename(new_path) + + # Make sure every table exists. + async with aiosqlite.connect(self._store_path) as db: + await db.execute( + "CREATE TABLE IF NOT EXISTS documents (path TEXT PRIMARY KEY, version INTEGER NOT NULL)" + ) + await db.execute( + "CREATE TABLE IF NOT EXISTS yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)" + ) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_yupdates_path_timestamp ON yupdates (path, timestamp)" + ) + await db.execute(f"PRAGMA user_version = {self.version}") + await db.commit() + + self._initialized.set() + + async def exists(self, path: str) -> bool: + """ + Returns True if the document exists, else returns False. + + Arguments: + path: The document name/path. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + cursor = await db.execute( + "SELECT path, version FROM documents WHERE path = ?", + (path,), + ) + return (await cursor.fetchone()) is not None + + async def list(self) -> AsyncIterator[str]: # type: ignore[override] + """ + Returns a list with the name/path of the documents stored. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + async with db.execute("SELECT path FROM documents") as cursor: + async for path in cursor: + yield path[0] + + async def get(self, path: str, updates: bool = False) -> dict | None: + """ + Returns the document's metadata and updates or None if the document does't exist. + + Arguments: + path: The document name/path. + updates: Whether to return document's content or only the metadata. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + cursor = await db.execute( + "SELECT path, version FROM documents WHERE path = ?", + (path,), + ) + doc = await cursor.fetchone() + + if doc is None: + return None + + list_updates: Iterable[Any] = [] + if updates: + cursor = await db.execute( + "SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?", + (path,), + ) + list_updates = await cursor.fetchall() + + return dict(path=doc[0], version=doc[1], updates=list_updates) + + async def create(self, path: str, version: int) -> None: + """ + Creates a new document. + + Arguments: + path: The document name/path. + version: Document version. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + try: + async with aiosqlite.connect(self._store_path) as db: + await db.execute( + "INSERT INTO documents VALUES (?, ?)", + (path, version), + ) + await db.commit() + except aiosqlite.IntegrityError: + raise YDocExists(f"The document {path} already exists.") + + async def remove(self, path: str) -> None: + """ + Removes a document. + + Arguments: + path: The document name/path. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + cursor = await db.execute( + "SELECT path, version FROM documents WHERE path = ?", + (path,), + ) + if (await cursor.fetchone()) is None: + raise YDocNotFound(f"The document {path} doesn't exists.") + + await db.execute( + "DELETE FROM documents WHERE path = ?", + (path,), + ) + await db.execute( + "DELETE FROM yupdates WHERE path = ?", + (path,), + ) + await db.commit() + + async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: ignore + """Async iterator for reading the store content. + + Arguments: + path: The document name/path. + + Returns: + A tuple of (update, metadata, timestamp) for each update. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + try: + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + async with db.execute( + "SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?", + (path,), + ) as cursor: + found = False + async for update, metadata, timestamp in cursor: + found = True + yield update, metadata, timestamp + if not found: + raise YDocNotFound + except Exception: + raise YDocNotFound + + async def write(self, path: str, data: bytes) -> None: + """ + Store an update. + + Arguments: + path: The document name/path. + data: The update to store. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + # first, determine time elapsed since last update + cursor = await db.execute( + "SELECT timestamp FROM yupdates WHERE path = ? ORDER BY timestamp DESC LIMIT 1", + (path,), + ) + row = await cursor.fetchone() + diff = (time.time() - row[0]) if row else 0 + + if self.document_ttl is not None and diff > self.document_ttl: + # squash updates + ydoc = Y.YDoc() + async with db.execute( + "SELECT yupdate FROM yupdates WHERE path = ?", (path,) + ) as cursor: + async for update, in cursor: + Y.apply_update(ydoc, update) + # delete history + await db.execute("DELETE FROM yupdates WHERE path = ?", (path,)) + # insert squashed updates + squashed_update = Y.encode_state_as_update(ydoc) + metadata = await self.get_metadata() + await db.execute( + "INSERT INTO yupdates VALUES (?, ?, ?, ?)", + (path, squashed_update, metadata, time.time()), + ) + + # finally, write this update to the DB + metadata = await self.get_metadata() + await db.execute( + "INSERT INTO yupdates VALUES (?, ?, ?, ?)", + (path, data, metadata, time.time()), + ) + await db.commit() diff --git a/jupyter_collaboration/stores.py b/jupyter_collaboration/stores/stores.py similarity index 91% rename from jupyter_collaboration/stores.py rename to jupyter_collaboration/stores/stores.py index aa2de45f..66ad5927 100644 --- a/jupyter_collaboration/stores.py +++ b/jupyter_collaboration/stores/stores.py @@ -7,8 +7,9 @@ from traitlets import Int, Unicode from traitlets.config import LoggingConfigurable -from ypy_websocket.stores import FileYStore -from ypy_websocket.stores import SQLiteYStore as _SQLiteYStore + +from .file_store import FileYStore +from .sqlite_store import SQLiteYStore as _SQLiteYStore class TempFileYStore(FileYStore): diff --git a/jupyter_collaboration/stores/utils.py b/jupyter_collaboration/stores/utils.py new file mode 100644 index 00000000..6c5bf76b --- /dev/null +++ b/jupyter_collaboration/stores/utils.py @@ -0,0 +1,6 @@ +class YDocNotFound(Exception): + pass + + +class YDocExists(Exception): + pass