From 8bc7d9cac07f2a710fa24d74f7cc68316231b5db Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Fri, 22 Sep 2023 11:31:13 +0200 Subject: [PATCH 01/17] Split stores into different files --- ypy_websocket/stores/__init__.py | 4 + ypy_websocket/stores/base_store.py | 118 ++++++++++++++++++ ypy_websocket/stores/file_store.py | 169 +++++++++++++++++++++++++ ypy_websocket/stores/sqlite_store.py | 180 +++++++++++++++++++++++++++ ypy_websocket/stores/utils.py | 3 + 5 files changed, 474 insertions(+) create mode 100644 ypy_websocket/stores/__init__.py create mode 100644 ypy_websocket/stores/base_store.py create mode 100644 ypy_websocket/stores/file_store.py create mode 100644 ypy_websocket/stores/sqlite_store.py create mode 100644 ypy_websocket/stores/utils.py diff --git a/ypy_websocket/stores/__init__.py b/ypy_websocket/stores/__init__.py new file mode 100644 index 0000000..e945cf9 --- /dev/null +++ b/ypy_websocket/stores/__init__.py @@ -0,0 +1,4 @@ +from .base_store import BaseYStore # noqa +from .file_store import FileYStore, TempFileYStore # noqa +from .sqlite_store import SQLiteYStore # noqa +from .utils import YDocNotFound # noqa diff --git a/ypy_websocket/stores/base_store.py b/ypy_websocket/stores/base_store.py new file mode 100644 index 0000000..a363903 --- /dev/null +++ b/ypy_websocket/stores/base_store.py @@ -0,0 +1,118 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from contextlib import AsyncExitStack +from inspect import isawaitable +from typing import AsyncIterator, Awaitable, Callable, cast + +from anyio import TASK_STATUS_IGNORED, Event, create_task_group +from anyio.abc import TaskGroup, TaskStatus + +import y_py as Y + + +class BaseYStore(ABC): + + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None + version = 2 + _started: Event | None = None + _starting: bool = False + _task_group: TaskGroup | None = None + + @abstractmethod + def __init__( + self, path: str, metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None + ): + ... + + @abstractmethod + async def write(self, data: bytes) -> None: + ... + + @abstractmethod + async def read(self) -> AsyncIterator[tuple[bytes, bytes]]: + ... + + @property + def started(self) -> Event: + if self._started is None: + self._started = Event() + return self._started + + async def __aenter__(self) -> BaseYStore: + if self._task_group is not None: + raise RuntimeError("YStore already running") + + async with AsyncExitStack() as exit_stack: + tg = create_task_group() + self._task_group = await exit_stack.enter_async_context(tg) + self._exit_stack = exit_stack.pop_all() + tg.start_soon(self.start) + + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + if self._task_group is None: + raise RuntimeError("YStore not running") + + self._task_group.cancel_scope.cancel() + self._task_group = None + return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb) + + async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED): + """Start the store. + + Arguments: + task_status: The status to set when the task has started. + """ + if self._starting: + return + else: + self._staring = True + + if self._task_group is not None: + raise RuntimeError("YStore already running") + + self.started.set() + self._starting = False + task_status.started() + + def stop(self) -> None: + """Stop the store.""" + if self._task_group is None: + raise RuntimeError("YStore not running") + + self._task_group.cancel_scope.cancel() + self._task_group = None + + async def get_metadata(self) -> bytes: + """ + Returns: + The metadata. + """ + if self.metadata_callback is None: + return b"" + + metadata = self.metadata_callback() + if isawaitable(metadata): + metadata = await metadata + metadata = cast(bytes, metadata) + return metadata + + async def encode_state_as_update(self, ydoc: Y.YDoc) -> None: + """Store a YDoc state. + + Arguments: + ydoc: The YDoc from which to store the state. + """ + update = Y.encode_state_as_update(ydoc) # type: ignore + await self.write(update) + + async def apply_updates(self, ydoc: Y.YDoc) -> None: + """Apply all stored updates to the YDoc. + + Arguments: + ydoc: The YDoc on which to apply the updates. + """ + async for update, *rest in self.read(): # type: ignore + Y.apply_update(ydoc, update) # type: ignore diff --git a/ypy_websocket/stores/file_store.py b/ypy_websocket/stores/file_store.py new file mode 100644 index 0000000..6572944 --- /dev/null +++ b/ypy_websocket/stores/file_store.py @@ -0,0 +1,169 @@ +from __future__ import annotations + +import time +import struct +import tempfile +from logging import Logger, getLogger +from pathlib import Path +from typing import AsyncIterator, Awaitable, Callable + +import anyio +from anyio import Lock + +import y_py as Y + +from .yutils import Decoder, get_new_path, write_var_uint +from .base_store import BaseYStore +from .utils import YDocNotFound + +class FileYStore(BaseYStore): + """A YStore which uses one file per document.""" + + path: str + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None + lock: Lock + + def __init__( + self, + path: str, + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None, + log: Logger | None = None, + ) -> None: + """Initialize the object. + + Arguments: + path: The file path used to store the updates. + metadata_callback: An optional callback to call to get the metadata. + log: An optional logger. + """ + self.path = path + self.metadata_callback = metadata_callback + self.log = log or getLogger(__name__) + self.lock = Lock() + + async def check_version(self) -> int: + """Check the version of the store format. + + Returns: + The offset where the data is located in the file. + """ + if not await anyio.Path(self.path).exists(): + version_mismatch = True + else: + version_mismatch = False + move_file = False + async with await anyio.open_file(self.path, "rb") as f: + header = await f.read(8) + if header == b"VERSION:": + version = int(await f.readline()) + if version == self.version: + offset = await f.tell() + else: + version_mismatch = True + else: + version_mismatch = True + if version_mismatch: + move_file = True + if move_file: + new_path = await get_new_path(self.path) + self.log.warning(f"YStore version mismatch, moving {self.path} to {new_path}") + await anyio.Path(self.path).rename(new_path) + if version_mismatch: + async with await anyio.open_file(self.path, "wb") as f: + version_bytes = f"VERSION:{self.version}\n".encode() + await f.write(version_bytes) + offset = len(version_bytes) + return offset + + async def read(self) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: ignore + """Async iterator for reading the store content. + + Returns: + A tuple of (update, metadata, timestamp) for each update. + """ + async with self.lock: + if not await anyio.Path(self.path).exists(): + raise YDocNotFound + offset = await self.check_version() + async with await anyio.open_file(self.path, "rb") as f: + await f.seek(offset) + data = await f.read() + if not data: + raise YDocNotFound + i = 0 + for d in Decoder(data).read_messages(): + if i == 0: + update = d + elif i == 1: + metadata = d + else: + timestamp = struct.unpack(" None: + """Store an update. + + Arguments: + data: The update to store. + """ + parent = Path(self.path).parent + async with self.lock: + await anyio.Path(parent).mkdir(parents=True, exist_ok=True) + await self.check_version() + async with await anyio.open_file(self.path, "ab") as f: + data_len = write_var_uint(len(data)) + await f.write(data_len + data) + metadata = await self.get_metadata() + metadata_len = write_var_uint(len(metadata)) + await f.write(metadata_len + metadata) + timestamp = struct.pack(" str: + """Get the base directory where the update file is written. + + Returns: + The base directory path. + """ + if self.base_dir is None: + self.make_directory() + assert self.base_dir is not None + return self.base_dir + + def make_directory(self): + """Create the base directory where the update file is written.""" + type(self).base_dir = tempfile.mkdtemp(prefix=self.prefix_dir) diff --git a/ypy_websocket/stores/sqlite_store.py b/ypy_websocket/stores/sqlite_store.py new file mode 100644 index 0000000..3bc362a --- /dev/null +++ b/ypy_websocket/stores/sqlite_store.py @@ -0,0 +1,180 @@ +from __future__ import annotations + +import time +import aiosqlite +from logging import Logger, getLogger +from typing import AsyncIterator, Awaitable, Callable + + +import anyio +from anyio import TASK_STATUS_IGNORED, Event, Lock, create_task_group +from anyio.abc import TaskStatus + +import y_py as Y + +from .yutils import get_new_path +from .base_store import BaseYStore +from .utils import YDocNotFound + +class SQLiteYStore(BaseYStore): + """A YStore which uses an SQLite database. + Unlike file-based YStores, the Y updates of all documents are stored in the same database. + + Subclass to point to your database file: + + ```py + class MySQLiteYStore(SQLiteYStore): + db_path = "path/to/my_ystore.db" + ``` + """ + + db_path: str = "ystore.db" + # Determines the "time to live" for all documents, i.e. how recent the + # latest update of a document must be before purging document history. + # Defaults to never purging document history (None). + document_ttl: int | None = None + path: str + lock: Lock + db_initialized: Event + + def __init__( + self, + path: str, + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None, + log: Logger | None = None, + ) -> None: + """Initialize the object. + + Arguments: + path: The file path used to store the updates. + metadata_callback: An optional callback to call to get the metadata. + log: An optional logger. + """ + self.path = path + self.metadata_callback = metadata_callback + self.log = log or getLogger(__name__) + self.lock = Lock() + self.db_initialized = Event() + + async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED): + """Start the SQLiteYStore. + + Arguments: + task_status: The status to set when the task has started. + """ + if self._starting: + return + else: + self._starting = True + + if self._task_group is not None: + raise RuntimeError("YStore already running") + + async with create_task_group() as self._task_group: + self._task_group.start_soon(self._init_db) + self.started.set() + self._starting = False + task_status.started() + + async def _init_db(self): + create_db = False + move_db = False + if not await anyio.Path(self.db_path).exists(): + create_db = True + else: + async with self.lock: + async with aiosqlite.connect(self.db_path) as db: + cursor = await db.execute( + "SELECT count(name) FROM sqlite_master WHERE type='table' and name='yupdates'" + ) + table_exists = (await cursor.fetchone())[0] + if table_exists: + cursor = await db.execute("pragma user_version") + version = (await cursor.fetchone())[0] + if version != self.version: + move_db = True + create_db = True + else: + create_db = True + if move_db: + new_path = await get_new_path(self.db_path) + self.log.warning(f"YStore version mismatch, moving {self.db_path} to {new_path}") + await anyio.Path(self.db_path).rename(new_path) + if create_db: + async with self.lock: + async with aiosqlite.connect(self.db_path) as db: + await db.execute( + "CREATE TABLE yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)" + ) + await db.execute( + "CREATE INDEX idx_yupdates_path_timestamp ON yupdates (path, timestamp)" + ) + await db.execute(f"PRAGMA user_version = {self.version}") + await db.commit() + self.db_initialized.set() + + async def read(self) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: ignore + """Async iterator for reading the store content. + + Returns: + A tuple of (update, metadata, timestamp) for each update. + """ + await self.db_initialized.wait() + try: + async with self.lock: + async with aiosqlite.connect(self.db_path) as db: + async with db.execute( + "SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?", + (self.path,), + ) as cursor: + found = False + async for update, metadata, timestamp in cursor: + found = True + yield update, metadata, timestamp + if not found: + raise YDocNotFound + except Exception: + raise YDocNotFound + + async def write(self, data: bytes) -> None: + """Store an update. + + Arguments: + data: The update to store. + """ + await self.db_initialized.wait() + async with self.lock: + async with aiosqlite.connect(self.db_path) as db: + # first, determine time elapsed since last update + cursor = await db.execute( + "SELECT timestamp FROM yupdates WHERE path = ? ORDER BY timestamp DESC LIMIT 1", + (self.path,), + ) + row = await cursor.fetchone() + diff = (time.time() - row[0]) if row else 0 + + if self.document_ttl is not None and diff > self.document_ttl: + # squash updates + ydoc = Y.YDoc() + async with db.execute( + "SELECT yupdate FROM yupdates WHERE path = ?", (self.path,) + ) as cursor: + async for update, in cursor: + Y.apply_update(ydoc, update) + # delete history + await db.execute("DELETE FROM yupdates WHERE path = ?", (self.path,)) + # insert squashed updates + squashed_update = Y.encode_state_as_update(ydoc) + metadata = await self.get_metadata() + await db.execute( + "INSERT INTO yupdates VALUES (?, ?, ?, ?)", + (self.path, squashed_update, metadata, time.time()), + ) + + # finally, write this update to the DB + metadata = await self.get_metadata() + await db.execute( + "INSERT INTO yupdates VALUES (?, ?, ?, ?)", + (self.path, data, metadata, time.time()), + ) + await db.commit() diff --git a/ypy_websocket/stores/utils.py b/ypy_websocket/stores/utils.py new file mode 100644 index 0000000..58485f7 --- /dev/null +++ b/ypy_websocket/stores/utils.py @@ -0,0 +1,3 @@ + +class YDocNotFound(Exception): + pass From 64c4ad3f6862d4550f1f9f6cdf6088ed7b8ab361 Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Wed, 27 Sep 2023 11:12:07 +0200 Subject: [PATCH 02/17] Removes the old stores --- ypy_websocket/ystore.py | 446 ---------------------------------------- 1 file changed, 446 deletions(-) delete mode 100644 ypy_websocket/ystore.py diff --git a/ypy_websocket/ystore.py b/ypy_websocket/ystore.py deleted file mode 100644 index f4ad417..0000000 --- a/ypy_websocket/ystore.py +++ /dev/null @@ -1,446 +0,0 @@ -from __future__ import annotations - -import struct -import tempfile -import time -from abc import ABC, abstractmethod -from contextlib import AsyncExitStack -from inspect import isawaitable -from logging import Logger, getLogger -from pathlib import Path -from typing import AsyncIterator, Awaitable, Callable, cast - -import aiosqlite -import anyio -import y_py as Y -from anyio import TASK_STATUS_IGNORED, Event, Lock, create_task_group -from anyio.abc import TaskGroup, TaskStatus - -from .yutils import Decoder, get_new_path, write_var_uint - - -class YDocNotFound(Exception): - pass - - -class BaseYStore(ABC): - - metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None - version = 2 - _started: Event | None = None - _starting: bool = False - _task_group: TaskGroup | None = None - - @abstractmethod - def __init__( - self, path: str, metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None - ): - ... - - @abstractmethod - async def write(self, data: bytes) -> None: - ... - - @abstractmethod - async def read(self) -> AsyncIterator[tuple[bytes, bytes]]: - ... - - @property - def started(self) -> Event: - if self._started is None: - self._started = Event() - return self._started - - async def __aenter__(self) -> BaseYStore: - if self._task_group is not None: - raise RuntimeError("YStore already running") - - async with AsyncExitStack() as exit_stack: - tg = create_task_group() - self._task_group = await exit_stack.enter_async_context(tg) - self._exit_stack = exit_stack.pop_all() - tg.start_soon(self.start) - - return self - - async def __aexit__(self, exc_type, exc_value, exc_tb): - if self._task_group is None: - raise RuntimeError("YStore not running") - - self._task_group.cancel_scope.cancel() - self._task_group = None - return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb) - - async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED): - """Start the store. - - Arguments: - task_status: The status to set when the task has started. - """ - if self._starting: - return - else: - self._starting = True - - if self._task_group is not None: - raise RuntimeError("YStore already running") - - self.started.set() - self._starting = False - task_status.started() - - def stop(self) -> None: - """Stop the store.""" - if self._task_group is None: - raise RuntimeError("YStore not running") - - self._task_group.cancel_scope.cancel() - self._task_group = None - - async def get_metadata(self) -> bytes: - """ - Returns: - The metadata. - """ - if self.metadata_callback is None: - return b"" - - metadata = self.metadata_callback() - if isawaitable(metadata): - metadata = await metadata - metadata = cast(bytes, metadata) - return metadata - - async def encode_state_as_update(self, ydoc: Y.YDoc) -> None: - """Store a YDoc state. - - Arguments: - ydoc: The YDoc from which to store the state. - """ - update = Y.encode_state_as_update(ydoc) # type: ignore - await self.write(update) - - async def apply_updates(self, ydoc: Y.YDoc) -> None: - """Apply all stored updates to the YDoc. - - Arguments: - ydoc: The YDoc on which to apply the updates. - """ - async for update, *rest in self.read(): # type: ignore - Y.apply_update(ydoc, update) # type: ignore - - -class FileYStore(BaseYStore): - """A YStore which uses one file per document.""" - - path: str - metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None - lock: Lock - - def __init__( - self, - path: str, - metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None, - log: Logger | None = None, - ) -> None: - """Initialize the object. - - Arguments: - path: The file path used to store the updates. - metadata_callback: An optional callback to call to get the metadata. - log: An optional logger. - """ - self.path = path - self.metadata_callback = metadata_callback - self.log = log or getLogger(__name__) - self.lock = Lock() - - async def check_version(self) -> int: - """Check the version of the store format. - - Returns: - The offset where the data is located in the file. - """ - if not await anyio.Path(self.path).exists(): - version_mismatch = True - else: - version_mismatch = False - move_file = False - async with await anyio.open_file(self.path, "rb") as f: - header = await f.read(8) - if header == b"VERSION:": - version = int(await f.readline()) - if version == self.version: - offset = await f.tell() - else: - version_mismatch = True - else: - version_mismatch = True - if version_mismatch: - move_file = True - if move_file: - new_path = await get_new_path(self.path) - self.log.warning(f"YStore version mismatch, moving {self.path} to {new_path}") - await anyio.Path(self.path).rename(new_path) - if version_mismatch: - async with await anyio.open_file(self.path, "wb") as f: - version_bytes = f"VERSION:{self.version}\n".encode() - await f.write(version_bytes) - offset = len(version_bytes) - return offset - - async def read(self) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: ignore - """Async iterator for reading the store content. - - Returns: - A tuple of (update, metadata, timestamp) for each update. - """ - async with self.lock: - if not await anyio.Path(self.path).exists(): - raise YDocNotFound - offset = await self.check_version() - async with await anyio.open_file(self.path, "rb") as f: - await f.seek(offset) - data = await f.read() - if not data: - raise YDocNotFound - i = 0 - for d in Decoder(data).read_messages(): - if i == 0: - update = d - elif i == 1: - metadata = d - else: - timestamp = struct.unpack(" None: - """Store an update. - - Arguments: - data: The update to store. - """ - parent = Path(self.path).parent - async with self.lock: - await anyio.Path(parent).mkdir(parents=True, exist_ok=True) - await self.check_version() - async with await anyio.open_file(self.path, "ab") as f: - data_len = write_var_uint(len(data)) - await f.write(data_len + data) - metadata = await self.get_metadata() - metadata_len = write_var_uint(len(metadata)) - await f.write(metadata_len + metadata) - timestamp = struct.pack(" str: - """Get the base directory where the update file is written. - - Returns: - The base directory path. - """ - if self.base_dir is None: - self.make_directory() - assert self.base_dir is not None - return self.base_dir - - def make_directory(self): - """Create the base directory where the update file is written.""" - type(self).base_dir = tempfile.mkdtemp(prefix=self.prefix_dir) - - -class SQLiteYStore(BaseYStore): - """A YStore which uses an SQLite database. - Unlike file-based YStores, the Y updates of all documents are stored in the same database. - - Subclass to point to your database file: - - ```py - class MySQLiteYStore(SQLiteYStore): - db_path = "path/to/my_ystore.db" - ``` - """ - - db_path: str = "ystore.db" - # Determines the "time to live" for all documents, i.e. how recent the - # latest update of a document must be before purging document history. - # Defaults to never purging document history (None). - document_ttl: int | None = None - path: str - lock: Lock - db_initialized: Event - - def __init__( - self, - path: str, - metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None, - log: Logger | None = None, - ) -> None: - """Initialize the object. - - Arguments: - path: The file path used to store the updates. - metadata_callback: An optional callback to call to get the metadata. - log: An optional logger. - """ - self.path = path - self.metadata_callback = metadata_callback - self.log = log or getLogger(__name__) - self.lock = Lock() - self.db_initialized = Event() - - async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED): - """Start the SQLiteYStore. - - Arguments: - task_status: The status to set when the task has started. - """ - if self._starting: - return - else: - self._starting = True - - if self._task_group is not None: - raise RuntimeError("YStore already running") - - async with create_task_group() as self._task_group: - self._task_group.start_soon(self._init_db) - self.started.set() - self._starting = False - task_status.started() - - async def _init_db(self): - create_db = False - move_db = False - if not await anyio.Path(self.db_path).exists(): - create_db = True - else: - async with self.lock: - async with aiosqlite.connect(self.db_path) as db: - cursor = await db.execute( - "SELECT count(name) FROM sqlite_master WHERE type='table' and name='yupdates'" - ) - table_exists = (await cursor.fetchone())[0] - if table_exists: - cursor = await db.execute("pragma user_version") - version = (await cursor.fetchone())[0] - if version != self.version: - move_db = True - create_db = True - else: - create_db = True - if move_db: - new_path = await get_new_path(self.db_path) - self.log.warning(f"YStore version mismatch, moving {self.db_path} to {new_path}") - await anyio.Path(self.db_path).rename(new_path) - if create_db: - async with self.lock: - async with aiosqlite.connect(self.db_path) as db: - await db.execute( - "CREATE TABLE yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)" - ) - await db.execute( - "CREATE INDEX idx_yupdates_path_timestamp ON yupdates (path, timestamp)" - ) - await db.execute(f"PRAGMA user_version = {self.version}") - await db.commit() - self.db_initialized.set() - - async def read(self) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: ignore - """Async iterator for reading the store content. - - Returns: - A tuple of (update, metadata, timestamp) for each update. - """ - await self.db_initialized.wait() - try: - async with self.lock: - async with aiosqlite.connect(self.db_path) as db: - async with db.execute( - "SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?", - (self.path,), - ) as cursor: - found = False - async for update, metadata, timestamp in cursor: - found = True - yield update, metadata, timestamp - if not found: - raise YDocNotFound - except Exception: - raise YDocNotFound - - async def write(self, data: bytes) -> None: - """Store an update. - - Arguments: - data: The update to store. - """ - await self.db_initialized.wait() - async with self.lock: - async with aiosqlite.connect(self.db_path) as db: - # first, determine time elapsed since last update - cursor = await db.execute( - "SELECT timestamp FROM yupdates WHERE path = ? ORDER BY timestamp DESC LIMIT 1", - (self.path,), - ) - row = await cursor.fetchone() - diff = (time.time() - row[0]) if row else 0 - - if self.document_ttl is not None and diff > self.document_ttl: - # squash updates - ydoc = Y.YDoc() - async with db.execute( - "SELECT yupdate FROM yupdates WHERE path = ?", (self.path,) - ) as cursor: - async for update, in cursor: - Y.apply_update(ydoc, update) - # delete history - await db.execute("DELETE FROM yupdates WHERE path = ?", (self.path,)) - # insert squashed updates - squashed_update = Y.encode_state_as_update(ydoc) - metadata = await self.get_metadata() - await db.execute( - "INSERT INTO yupdates VALUES (?, ?, ?, ?)", - (self.path, squashed_update, metadata, time.time()), - ) - - # finally, write this update to the DB - metadata = await self.get_metadata() - await db.execute( - "INSERT INTO yupdates VALUES (?, ?, ?, ?)", - (self.path, data, metadata, time.time()), - ) - await db.commit() From a87c0bb9c5c58b3c247148eabeab2531f81af4e1 Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Thu, 28 Sep 2023 15:27:06 +0200 Subject: [PATCH 03/17] Creates a global store and improves the API --- ypy_websocket/stores/__init__.py | 2 +- ypy_websocket/stores/base_store.py | 108 +++++++++++++++++++++++++++-- ypy_websocket/stores/utils.py | 3 + ypy_websocket/yroom.py | 7 +- 4 files changed, 107 insertions(+), 13 deletions(-) diff --git a/ypy_websocket/stores/__init__.py b/ypy_websocket/stores/__init__.py index e945cf9..0541cf0 100644 --- a/ypy_websocket/stores/__init__.py +++ b/ypy_websocket/stores/__init__.py @@ -1,4 +1,4 @@ from .base_store import BaseYStore # noqa from .file_store import FileYStore, TempFileYStore # noqa from .sqlite_store import SQLiteYStore # noqa -from .utils import YDocNotFound # noqa +from .utils import YDocNotFound, DocExists # noqa diff --git a/ypy_websocket/stores/base_store.py b/ypy_websocket/stores/base_store.py index a363903..25a6c29 100644 --- a/ypy_websocket/stores/base_store.py +++ b/ypy_websocket/stores/base_store.py @@ -12,9 +12,15 @@ class BaseYStore(ABC): + """ + Base class for the stores. + """ - metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None version = 2 + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None + + _store_path: str + _initialized: Event | None = None _started: Event | None = None _starting: bool = False _task_group: TaskGroup | None = None @@ -23,16 +29,102 @@ class BaseYStore(ABC): def __init__( self, path: str, metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None ): + """ + Initialize the object. + + Arguments: + path: The path where the store will be located or the prefix for file-based stores. + metadata_callback: An optional callback to call to get the metadata. + log: An optional logger. + """ + ... + + @abstractmethod + async def initialize(self) -> None: + """ + Initializes the store. + """ + ... + + @abstractmethod + async def exists(self, path: str) -> bool: + """ + Returns True if the document exists, else returns False. + + Arguments: + path: The document name/path. + """ + ... + + @abstractmethod + async def list(self) -> AsyncIterator[str]: + """ + Returns a list with the name/path of the documents stored. + """ + ... + + @abstractmethod + async def get(self, path: str) -> dict | None: + """ + Returns the document's metadata or None if the document does't exist. + + Arguments: + path: The document name/path. + """ ... + + @abstractmethod + async def create(self, path: str, version: int) -> None: + """ + Creates a new document. + Arguments: + path: The document name/path. + version: Document version. + """ + ... + @abstractmethod - async def write(self, data: bytes) -> None: + async def remove(self, path: str) -> dict | None: + """ + Removes a document. + + Arguments: + path: The document name/path. + """ + ... + + @abstractmethod + async def write(self, path: str, data: bytes) -> None: + """ + Store a document update. + + Arguments: + path: The document name/path. + data: The update to store. + """ ... @abstractmethod - async def read(self) -> AsyncIterator[tuple[bytes, bytes]]: + async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes]]: + """ + Async iterator for reading document's updates. + + Arguments: + path: The document name/path. + + Returns: + A tuple of (update, metadata, timestamp) for each update. + """ ... + @property + def initialized(self) -> bool: + if self._initialized is not None: + return self._initialized.is_set() + else : + return False + @property def started(self) -> Event: if self._started is None: @@ -99,20 +191,22 @@ async def get_metadata(self) -> bytes: metadata = cast(bytes, metadata) return metadata - async def encode_state_as_update(self, ydoc: Y.YDoc) -> None: + async def encode_state_as_update(self, path: str, ydoc: Y.YDoc) -> None: """Store a YDoc state. Arguments: + path: The document name/path. ydoc: The YDoc from which to store the state. """ update = Y.encode_state_as_update(ydoc) # type: ignore - await self.write(update) + await self.write(path, update) - async def apply_updates(self, ydoc: Y.YDoc) -> None: + async def apply_updates(self, path: str, ydoc: Y.YDoc) -> None: """Apply all stored updates to the YDoc. Arguments: + path: The document name/path. ydoc: The YDoc on which to apply the updates. """ - async for update, *rest in self.read(): # type: ignore + async for update, *rest in self.read(path): # type: ignore Y.apply_update(ydoc, update) # type: ignore diff --git a/ypy_websocket/stores/utils.py b/ypy_websocket/stores/utils.py index 58485f7..3415e55 100644 --- a/ypy_websocket/stores/utils.py +++ b/ypy_websocket/stores/utils.py @@ -1,3 +1,6 @@ class YDocNotFound(Exception): pass + +class DocExists(Exception): + pass diff --git a/ypy_websocket/yroom.py b/ypy_websocket/yroom.py index d602574..ad8deb7 100644 --- a/ypy_websocket/yroom.py +++ b/ypy_websocket/yroom.py @@ -18,7 +18,7 @@ from .awareness import Awareness from .websocket import Websocket -from .ystore import BaseYStore +from .stores import BaseYStore from .yutils import ( YMessageType, create_update_message, @@ -120,9 +120,6 @@ def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None): self._on_message = value async def _broadcast_updates(self): - if self.ystore is not None and not self.ystore.started.is_set(): - self._task_group.start_soon(self.ystore.start) - async with self._update_receive_stream: async for update in self._update_receive_stream: if self._task_group.cancel_scope.cancel_called: @@ -135,7 +132,7 @@ async def _broadcast_updates(self): self._task_group.start_soon(client.send, message) if self.ystore: self.log.debug("Writing Y update to YStore") - self._task_group.start_soon(self.ystore.write, update) + self._task_group.start_soon(self.ystore.write, client.path, update) async def __aenter__(self) -> YRoom: if self._task_group is not None: From bdd4f9efd23ea69e432545a614ea4048b2a1db9b Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Thu, 28 Sep 2023 15:28:09 +0200 Subject: [PATCH 04/17] Updates the SQLite store --- tests/test_sqlite_store.py | 309 +++++++++++++++++++++++++++ ypy_websocket/stores/sqlite_store.py | 224 +++++++++++++------ 2 files changed, 465 insertions(+), 68 deletions(-) create mode 100644 tests/test_sqlite_store.py diff --git a/tests/test_sqlite_store.py b/tests/test_sqlite_store.py new file mode 100644 index 0000000..cdec4be --- /dev/null +++ b/tests/test_sqlite_store.py @@ -0,0 +1,309 @@ +import pytest + +import time +import aiosqlite +import y_py as Y + +from ypy_websocket.stores import SQLiteYStore, DocExists + +@pytest.fixture +def create_database(): + async def _inner(path: str, version: int, tables: bool = True) -> None: + async with aiosqlite.connect(path) as db: + if tables: + await db.execute("CREATE TABLE IF NOT EXISTS documents (path TEXT PRIMARY KEY, version INTEGER NOT NULL)") + await db.execute("CREATE TABLE IF NOT EXISTS yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)") + await db.execute("CREATE INDEX IF NOT EXISTS idx_yupdates_path_timestamp ON yupdates (path, timestamp)") + await db.execute(f"PRAGMA user_version = {version}") + await db.commit() + + return _inner + +@pytest.fixture +def add_document(): + async def _inner(path: str, doc_path: str, version: int, data: bytes = b"") -> None: + async with aiosqlite.connect(path) as db: + await db.execute("INSERT INTO documents VALUES (?, ?)", (doc_path, version),) + await db.execute("INSERT INTO yupdates VALUES (?, ?, ?, ?)", (doc_path, data, b"", time.time()),) + await db.commit() + + return _inner + +@pytest.mark.anyio +async def test_initialization(tmp_path): + path = tmp_path / "tmp.db" + store = SQLiteYStore(str(path)) + await store.start() + await store.initialize() + + assert store.initialized == True + + async with aiosqlite.connect(path) as db: + cursor = await db.execute("pragma user_version") + version = (await cursor.fetchone())[0] + assert store.version == version + + cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'") + res = await cursor.fetchone() + assert res[0] == 1 + + cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'") + res = await cursor.fetchone() + assert res[0] == 1 + +@pytest.mark.anyio +async def test_initialization_with_old_database(tmp_path, create_database): + path = tmp_path / "tmp.db" + + # Create a database with an old version + await create_database(path, 1) + + store = SQLiteYStore(str(path)) + await store.start() + await store.initialize() + + assert store.initialized == True + + async with aiosqlite.connect(path) as db: + cursor = await db.execute("pragma user_version") + version = (await cursor.fetchone())[0] + assert store.version == version + + cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'") + res = await cursor.fetchone() + assert res[0] == 1 + + cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'") + res = await cursor.fetchone() + assert res[0] == 1 + +@pytest.mark.anyio +async def test_initialization_with_empty_database(tmp_path, create_database): + path = tmp_path / "tmp.db" + + # Create a database with an old version + await create_database(path, SQLiteYStore.version, False) + + store = SQLiteYStore(str(path)) + await store.start() + await store.initialize() + + assert store.initialized == True + + async with aiosqlite.connect(path) as db: + cursor = await db.execute("pragma user_version") + version = (await cursor.fetchone())[0] + assert store.version == version + + cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'") + res = await cursor.fetchone() + assert res[0] == 1 + + cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'") + res = await cursor.fetchone() + assert res[0] == 1 + +@pytest.mark.anyio +async def test_initialization_with_existing_database(tmp_path, create_database, add_document): + path = tmp_path / "tmp.db" + doc_path = "test.txt" + + # Create a database with an old version + await create_database(path, SQLiteYStore.version) + await add_document(path, doc_path, 0) + + store = SQLiteYStore(str(path)) + await store.start() + await store.initialize() + + assert store.initialized == True + + async with aiosqlite.connect(path) as db: + cursor = await db.execute("pragma user_version") + version = (await cursor.fetchone())[0] + assert store.version == version + + cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'") + res = await cursor.fetchone() + assert res[0] == 1 + + cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'") + res = await cursor.fetchone() + assert res[0] == 1 + + cursor = await db.execute("SELECT path, version FROM documents WHERE path = ?", (doc_path,),) + res = await cursor.fetchone() + assert res[0] == doc_path + assert res[1] == 0 + +@pytest.mark.anyio +async def test_exists(tmp_path, create_database, add_document): + path = tmp_path / "tmp.db" + doc_path = "test.txt" + + # Create a database with an old version + await create_database(path, SQLiteYStore.version) + await add_document(path, doc_path, 0) + + store = SQLiteYStore(str(path)) + await store.start() + await store.initialize() + + assert store.initialized == True + + res = await store.exists(doc_path) + assert res == True + + res = await store.exists("random.path") + assert res == False + +@pytest.mark.anyio +async def test_list(tmp_path, create_database, add_document): + path = tmp_path / "tmp.db" + doc1 = "test_1.txt" + doc2 = "test_2.txt" + + # Create a database with an old version + await create_database(path, SQLiteYStore.version) + await add_document(path, doc1, 0) + await add_document(path, doc2, 0) + + store = SQLiteYStore(str(path)) + await store.start() + await store.initialize() + + assert store.initialized == True + + count = 0 + async for doc in store.list(): + count += 1 + assert doc in [doc1, doc2] + + assert count == 2 + +@pytest.mark.anyio +async def test_get(tmp_path, create_database, add_document): + path = tmp_path / "tmp.db" + doc_path = "test.txt" + + # Create a database with an old version + await create_database(path, SQLiteYStore.version) + await add_document(path, doc_path, 0) + + store = SQLiteYStore(str(path)) + await store.start() + await store.initialize() + + assert store.initialized == True + + res = await store.get(doc_path) + assert res["path"] == doc_path + assert res["version"] == 0 + + res = await store.get("random.doc") + assert res == None + +@pytest.mark.anyio +async def test_create(tmp_path, create_database, add_document): + path = tmp_path / "tmp.db" + doc_path = "test.txt" + + # Create a database with an old version + await create_database(path, SQLiteYStore.version) + await add_document(path, doc_path, 0) + + store = SQLiteYStore(str(path)) + await store.start() + await store.initialize() + + assert store.initialized == True + + new_doc = "new_doc.path" + await store.create(new_doc, 0) + async with aiosqlite.connect(path) as db: + cursor = await db.execute("SELECT path, version FROM documents WHERE path = ?", (new_doc,),) + res = await cursor.fetchone() + assert res[0] == new_doc + assert res[1] == 0 + + with pytest.raises(DocExists) as e: + await store.create(doc_path, 0) + assert str(e.value) == f"The document {doc_path} already exists." + +@pytest.mark.anyio +async def test_remove(tmp_path, create_database, add_document): + path = tmp_path / "tmp.db" + doc_path = "test.txt" + + # Create a database with an old version + await create_database(path, SQLiteYStore.version) + await add_document(path, doc_path, 0) + + store = SQLiteYStore(str(path)) + await store.start() + await store.initialize() + + assert store.initialized == True + + await store.remove(doc_path) + res = await store.exists(doc_path) + assert res == False + + new_doc = "new_doc.path" + res = await store.exists(new_doc) + assert res == False + + await store.remove(new_doc) + res = await store.exists(new_doc) + assert res == False + +@pytest.mark.anyio +async def test_read(tmp_path, create_database, add_document): + path = tmp_path / "tmp.db" + doc_path = "test.txt" + update = b"foo" + + # Create a database with an old version + await create_database(path, SQLiteYStore.version) + await add_document(path, doc_path, 0, update) + + store = SQLiteYStore(str(path)) + await store.start() + await store.initialize() + + assert store.initialized == True + + count = 0 + async for u,_,_ in store.read(doc_path): + count += 1 + assert update == u + + assert count == 1 + +@pytest.mark.anyio +async def test_write(tmp_path, create_database, add_document): + path = tmp_path / "tmp.db" + doc_path = "test.txt" + + # Create a database with an old version + await create_database(path, SQLiteYStore.version) + await add_document(path, doc_path, 0) + + store = SQLiteYStore(str(path)) + await store.start() + await store.initialize() + + assert store.initialized == True + + update = b"foo" + await store.write(doc_path, update) + + async with aiosqlite.connect(path) as db: + async with db.execute("SELECT yupdate FROM yupdates WHERE path = ?", (doc_path,)) as cursor: + count = 0 + async for u, in cursor: + count += 1 + # The fixture add_document inserts an empty update + assert u in [b"", update] + assert count == 2 + \ No newline at end of file diff --git a/ypy_websocket/stores/sqlite_store.py b/ypy_websocket/stores/sqlite_store.py index 3bc362a..06cad17 100644 --- a/ypy_websocket/stores/sqlite_store.py +++ b/ypy_websocket/stores/sqlite_store.py @@ -12,9 +12,9 @@ import y_py as Y -from .yutils import get_new_path +from ..yutils import get_new_path from .base_store import BaseYStore -from .utils import YDocNotFound +from .utils import YDocNotFound, DocExists class SQLiteYStore(BaseYStore): """A YStore which uses an SQLite database. @@ -24,37 +24,33 @@ class SQLiteYStore(BaseYStore): ```py class MySQLiteYStore(SQLiteYStore): - db_path = "path/to/my_ystore.db" + _store_path = "path/to/my_ystore.db" ``` """ - db_path: str = "ystore.db" + _lock: Lock # Determines the "time to live" for all documents, i.e. how recent the # latest update of a document must be before purging document history. # Defaults to never purging document history (None). document_ttl: int | None = None - path: str - lock: Lock - db_initialized: Event def __init__( self, - path: str, + path: str = "./ystore.db", metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None, log: Logger | None = None, ) -> None: """Initialize the object. Arguments: - path: The file path used to store the updates. + path: The database path used to store the updates. metadata_callback: An optional callback to call to get the metadata. log: An optional logger. """ - self.path = path + self._lock = Lock() + self._store_path = path self.metadata_callback = metadata_callback self.log = log or getLogger(__name__) - self.lock = Lock() - self.db_initialized = Event() async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED): """Start the SQLiteYStore. @@ -70,62 +66,149 @@ async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED): if self._task_group is not None: raise RuntimeError("YStore already running") - async with create_task_group() as self._task_group: - self._task_group.start_soon(self._init_db) - self.started.set() - self._starting = False - task_status.started() - - async def _init_db(self): - create_db = False - move_db = False - if not await anyio.Path(self.db_path).exists(): - create_db = True - else: - async with self.lock: - async with aiosqlite.connect(self.db_path) as db: - cursor = await db.execute( - "SELECT count(name) FROM sqlite_master WHERE type='table' and name='yupdates'" - ) - table_exists = (await cursor.fetchone())[0] - if table_exists: - cursor = await db.execute("pragma user_version") - version = (await cursor.fetchone())[0] - if version != self.version: - move_db = True - create_db = True - else: - create_db = True - if move_db: - new_path = await get_new_path(self.db_path) - self.log.warning(f"YStore version mismatch, moving {self.db_path} to {new_path}") - await anyio.Path(self.db_path).rename(new_path) - if create_db: - async with self.lock: - async with aiosqlite.connect(self.db_path) as db: - await db.execute( - "CREATE TABLE yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)" - ) - await db.execute( - "CREATE INDEX idx_yupdates_path_timestamp ON yupdates (path, timestamp)" - ) - await db.execute(f"PRAGMA user_version = {self.version}") + self._task_group = create_task_group() + self.started.set() + self._starting = False + task_status.started() + + async def initialize(self) -> None: + """ + Initializes the store. + """ + if self.initialized or self._initialized is not None: + return + self._initialized = Event() + + async with self._lock: + if await anyio.Path(self._store_path).exists(): + version = -1 + async with aiosqlite.connect(self._store_path) as db: + cursor = await db.execute("pragma user_version") + version = (await cursor.fetchone())[0] + + # The DB has an old version. Move the database. + if self.version != version: + new_path = await get_new_path(self._store_path) + self.log.warning(f"YStore version mismatch, moving {self._store_path} to {new_path}") + await anyio.Path(self._store_path).rename(new_path) + + # Make sure every table exists. + async with aiosqlite.connect(self._store_path) as db: + await db.execute("CREATE TABLE IF NOT EXISTS documents (path TEXT PRIMARY KEY, version INTEGER NOT NULL)") + await db.execute("CREATE TABLE IF NOT EXISTS yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)") + await db.execute("CREATE INDEX IF NOT EXISTS idx_yupdates_path_timestamp ON yupdates (path, timestamp)") + await db.execute(f"PRAGMA user_version = {self.version}") + await db.commit() + + self._initialized.set() + + async def exists(self, path: str) -> bool: + """ + Returns True if the document exists, else returns False. + + Arguments: + path: The document name/path. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + cursor = await db.execute("SELECT path, version FROM documents WHERE path = ?", (path,),) + return (await cursor.fetchone()) is not None + + async def list(self) -> AsyncIterator[str]: + """ + Returns a list with the name/path of the documents stored. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + async with db.execute("SELECT path FROM documents") as cursor: + async for path in cursor: + yield path[0] + + async def get(self, path: str) -> dict | None: + """ + Returns the document's metadata or None if the document does't exist. + + Arguments: + path: The document name/path. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + cursor = await db.execute("SELECT path, version FROM documents WHERE path = ?", (path,),) + doc = await cursor.fetchone() + + if doc is None: + return None + else : + return dict(path=doc[0], version=doc[1]) + + async def create(self, path: str, version: int) -> None: + """ + Creates a new document. + + Arguments: + path: The document name/path. + version: Document version. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + try: + async with aiosqlite.connect(self._store_path) as db: + await db.execute("INSERT INTO documents VALUES (?, ?)", (path, version),) await db.commit() - self.db_initialized.set() + except aiosqlite.IntegrityError: + raise DocExists(f"The document {path} already exists.") + + async def remove(self, path: str) -> None: + """ + Removes a document. - async def read(self) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: ignore + Arguments: + path: The document name/path. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + await db.execute("DELETE FROM documents WHERE path = ?", (path,),) + await db.execute("DELETE FROM yupdates WHERE path = ?", (path,),) + await db.commit() + + async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: ignore """Async iterator for reading the store content. + Arguments: + path: The document name/path. + Returns: A tuple of (update, metadata, timestamp) for each update. """ - await self.db_initialized.wait() + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + try: - async with self.lock: - async with aiosqlite.connect(self.db_path) as db: + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: async with db.execute( "SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?", - (self.path,), + (path,), ) as cursor: found = False async for update, metadata, timestamp in cursor: @@ -136,19 +219,24 @@ async def read(self) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: igno except Exception: raise YDocNotFound - async def write(self, data: bytes) -> None: - """Store an update. + async def write(self, path: str, data: bytes) -> None: + """ + Store an update. Arguments: + path: The document name/path. data: The update to store. """ - await self.db_initialized.wait() - async with self.lock: - async with aiosqlite.connect(self.db_path) as db: + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: # first, determine time elapsed since last update cursor = await db.execute( "SELECT timestamp FROM yupdates WHERE path = ? ORDER BY timestamp DESC LIMIT 1", - (self.path,), + (path,), ) row = await cursor.fetchone() diff = (time.time() - row[0]) if row else 0 @@ -157,24 +245,24 @@ async def write(self, data: bytes) -> None: # squash updates ydoc = Y.YDoc() async with db.execute( - "SELECT yupdate FROM yupdates WHERE path = ?", (self.path,) + "SELECT yupdate FROM yupdates WHERE path = ?", (path,) ) as cursor: async for update, in cursor: Y.apply_update(ydoc, update) # delete history - await db.execute("DELETE FROM yupdates WHERE path = ?", (self.path,)) + await db.execute("DELETE FROM yupdates WHERE path = ?", (path,)) # insert squashed updates squashed_update = Y.encode_state_as_update(ydoc) metadata = await self.get_metadata() await db.execute( "INSERT INTO yupdates VALUES (?, ?, ?, ?)", - (self.path, squashed_update, metadata, time.time()), + (path, squashed_update, metadata, time.time()), ) # finally, write this update to the DB metadata = await self.get_metadata() await db.execute( "INSERT INTO yupdates VALUES (?, ?, ?, ?)", - (self.path, data, metadata, time.time()), + (path, data, metadata, time.time()), ) await db.commit() From 3fddf19f8ab8c3cb66f67257e89ff54ac85049bf Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Fri, 29 Sep 2023 14:18:52 +0200 Subject: [PATCH 05/17] Updates the file store --- pyproject.toml | 1 + tests/test_file_store.py | 288 +++++++++++++++++++++++++++ tests/test_sqlite_store.py | 142 ++++++++----- tests/test_ystore.py | 66 +++--- ypy_websocket/stores/__init__.py | 2 +- ypy_websocket/stores/base_store.py | 19 +- ypy_websocket/stores/file_store.py | 220 +++++++++++++++----- ypy_websocket/stores/sqlite_store.py | 68 ++++--- ypy_websocket/stores/utils.py | 2 +- ypy_websocket/yroom.py | 2 +- 10 files changed, 635 insertions(+), 175 deletions(-) create mode 100644 tests/test_file_store.py diff --git a/pyproject.toml b/pyproject.toml index 68e568f..f186505 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ classifiers = [ "Programming Language :: Python :: 3.11", ] dependencies = [ + "deprecated", "anyio >=3.6.2,<5", "aiosqlite >=0.18.0,<1", "y-py >=0.6.0,<0.7.0", diff --git a/tests/test_file_store.py b/tests/test_file_store.py new file mode 100644 index 0000000..475bcf9 --- /dev/null +++ b/tests/test_file_store.py @@ -0,0 +1,288 @@ +import struct +import time +from pathlib import Path + +import anyio +import pytest + +from ypy_websocket.stores import DocExists, FileYStore +from ypy_websocket.yutils import Decoder, write_var_uint + + +@pytest.fixture +def create_store(): + async def _inner(path: str, version: int) -> None: + await anyio.Path(path).mkdir(parents=True, exist_ok=True) + version_path = Path(path, "__version__") + async with await anyio.open_file(version_path, "wb") as f: + version_bytes = str(version).encode() + await f.write(version_bytes) + + return _inner + + +@pytest.fixture +def add_document(): + async def _inner(path: str, doc_path: str, version: int, data: bytes = b"") -> None: + file_path = Path(path / (doc_path + ".y")) + await anyio.Path(file_path.parent).mkdir(parents=True, exist_ok=True) + + async with await anyio.open_file(file_path, "ab") as f: + version_bytes = f"VERSION:{version}\n".encode() + await f.write(version_bytes) + data_len = write_var_uint(len(data)) + await f.write(data_len + data) + metadata = b"" + metadata_len = write_var_uint(len(metadata)) + await f.write(metadata_len + metadata) + timestamp = struct.pack(" None: async with aiosqlite.connect(path) as db: if tables: - await db.execute("CREATE TABLE IF NOT EXISTS documents (path TEXT PRIMARY KEY, version INTEGER NOT NULL)") - await db.execute("CREATE TABLE IF NOT EXISTS yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)") - await db.execute("CREATE INDEX IF NOT EXISTS idx_yupdates_path_timestamp ON yupdates (path, timestamp)") + await db.execute( + "CREATE TABLE IF NOT EXISTS documents (path TEXT PRIMARY KEY, version INTEGER NOT NULL)" + ) + await db.execute( + "CREATE TABLE IF NOT EXISTS yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)" + ) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_yupdates_path_timestamp ON yupdates (path, timestamp)" + ) await db.execute(f"PRAGMA user_version = {version}") await db.commit() - + return _inner + @pytest.fixture def add_document(): async def _inner(path: str, doc_path: str, version: int, data: bytes = b"") -> None: async with aiosqlite.connect(path) as db: - await db.execute("INSERT INTO documents VALUES (?, ?)", (doc_path, version),) - await db.execute("INSERT INTO yupdates VALUES (?, ?, ?, ?)", (doc_path, data, b"", time.time()),) + await db.execute( + "INSERT INTO documents VALUES (?, ?)", + (doc_path, version), + ) + await db.execute( + "INSERT INTO yupdates VALUES (?, ?, ?, ?)", + (doc_path, data, b"", time.time()), + ) await db.commit() - + return _inner + @pytest.mark.anyio async def test_initialization(tmp_path): path = tmp_path / "tmp.db" @@ -36,21 +50,26 @@ async def test_initialization(tmp_path): await store.start() await store.initialize() - assert store.initialized == True + assert store.initialized async with aiosqlite.connect(path) as db: cursor = await db.execute("pragma user_version") version = (await cursor.fetchone())[0] assert store.version == version - cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'") + cursor = await db.execute( + "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'" + ) res = await cursor.fetchone() assert res[0] == 1 - cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'") + cursor = await db.execute( + "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'" + ) res = await cursor.fetchone() assert res[0] == 1 + @pytest.mark.anyio async def test_initialization_with_old_database(tmp_path, create_database): path = tmp_path / "tmp.db" @@ -62,21 +81,26 @@ async def test_initialization_with_old_database(tmp_path, create_database): await store.start() await store.initialize() - assert store.initialized == True + assert store.initialized async with aiosqlite.connect(path) as db: cursor = await db.execute("pragma user_version") version = (await cursor.fetchone())[0] assert store.version == version - cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'") + cursor = await db.execute( + "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'" + ) res = await cursor.fetchone() assert res[0] == 1 - cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'") + cursor = await db.execute( + "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'" + ) res = await cursor.fetchone() assert res[0] == 1 - + + @pytest.mark.anyio async def test_initialization_with_empty_database(tmp_path, create_database): path = tmp_path / "tmp.db" @@ -88,21 +112,26 @@ async def test_initialization_with_empty_database(tmp_path, create_database): await store.start() await store.initialize() - assert store.initialized == True + assert store.initialized async with aiosqlite.connect(path) as db: cursor = await db.execute("pragma user_version") version = (await cursor.fetchone())[0] assert store.version == version - cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'") + cursor = await db.execute( + "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'" + ) res = await cursor.fetchone() assert res[0] == 1 - cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'") + cursor = await db.execute( + "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'" + ) res = await cursor.fetchone() assert res[0] == 1 + @pytest.mark.anyio async def test_initialization_with_existing_database(tmp_path, create_database, add_document): path = tmp_path / "tmp.db" @@ -116,26 +145,34 @@ async def test_initialization_with_existing_database(tmp_path, create_database, await store.start() await store.initialize() - assert store.initialized == True + assert store.initialized async with aiosqlite.connect(path) as db: cursor = await db.execute("pragma user_version") version = (await cursor.fetchone())[0] assert store.version == version - cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'") + cursor = await db.execute( + "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'" + ) res = await cursor.fetchone() assert res[0] == 1 - cursor = await db.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'") + cursor = await db.execute( + "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'" + ) res = await cursor.fetchone() assert res[0] == 1 - cursor = await db.execute("SELECT path, version FROM documents WHERE path = ?", (doc_path,),) + cursor = await db.execute( + "SELECT path, version FROM documents WHERE path = ?", + (doc_path,), + ) res = await cursor.fetchone() assert res[0] == doc_path assert res[1] == 0 + @pytest.mark.anyio async def test_exists(tmp_path, create_database, add_document): path = tmp_path / "tmp.db" @@ -149,13 +186,12 @@ async def test_exists(tmp_path, create_database, add_document): await store.start() await store.initialize() - assert store.initialized == True + assert store.initialized + + assert await store.exists(doc_path) - res = await store.exists(doc_path) - assert res == True + assert not await store.exists("random.path") - res = await store.exists("random.path") - assert res == False @pytest.mark.anyio async def test_list(tmp_path, create_database, add_document): @@ -172,15 +208,16 @@ async def test_list(tmp_path, create_database, add_document): await store.start() await store.initialize() - assert store.initialized == True + assert store.initialized count = 0 async for doc in store.list(): count += 1 assert doc in [doc1, doc2] - + assert count == 2 + @pytest.mark.anyio async def test_get(tmp_path, create_database, add_document): path = tmp_path / "tmp.db" @@ -194,14 +231,15 @@ async def test_get(tmp_path, create_database, add_document): await store.start() await store.initialize() - assert store.initialized == True + assert store.initialized res = await store.get(doc_path) assert res["path"] == doc_path assert res["version"] == 0 res = await store.get("random.doc") - assert res == None + assert res is None + @pytest.mark.anyio async def test_create(tmp_path, create_database, add_document): @@ -216,20 +254,24 @@ async def test_create(tmp_path, create_database, add_document): await store.start() await store.initialize() - assert store.initialized == True + assert store.initialized new_doc = "new_doc.path" await store.create(new_doc, 0) async with aiosqlite.connect(path) as db: - cursor = await db.execute("SELECT path, version FROM documents WHERE path = ?", (new_doc,),) + cursor = await db.execute( + "SELECT path, version FROM documents WHERE path = ?", + (new_doc,), + ) res = await cursor.fetchone() assert res[0] == new_doc assert res[1] == 0 - with pytest.raises(DocExists) as e: + with pytest.raises(DocExists) as e: await store.create(doc_path, 0) assert str(e.value) == f"The document {doc_path} already exists." - + + @pytest.mark.anyio async def test_remove(tmp_path, create_database, add_document): path = tmp_path / "tmp.db" @@ -243,19 +285,17 @@ async def test_remove(tmp_path, create_database, add_document): await store.start() await store.initialize() - assert store.initialized == True + assert store.initialized await store.remove(doc_path) - res = await store.exists(doc_path) - assert res == False + assert not await store.exists(doc_path) new_doc = "new_doc.path" - res = await store.exists(new_doc) - assert res == False + assert not await store.exists(new_doc) await store.remove(new_doc) - res = await store.exists(new_doc) - assert res == False + assert not await store.exists(new_doc) + @pytest.mark.anyio async def test_read(tmp_path, create_database, add_document): @@ -271,15 +311,16 @@ async def test_read(tmp_path, create_database, add_document): await store.start() await store.initialize() - assert store.initialized == True + assert store.initialized count = 0 - async for u,_,_ in store.read(doc_path): + async for u, _, _ in store.read(doc_path): count += 1 assert update == u - + assert count == 1 + @pytest.mark.anyio async def test_write(tmp_path, create_database, add_document): path = tmp_path / "tmp.db" @@ -293,11 +334,11 @@ async def test_write(tmp_path, create_database, add_document): await store.start() await store.initialize() - assert store.initialized == True + assert store.initialized update = b"foo" await store.write(doc_path, update) - + async with aiosqlite.connect(path) as db: async with db.execute("SELECT yupdate FROM yupdates WHERE path = ?", (doc_path,)) as cursor: count = 0 @@ -306,4 +347,3 @@ async def test_write(tmp_path, create_database, add_document): # The fixture add_document inserts an empty update assert u in [b"", update] assert count == 2 - \ No newline at end of file diff --git a/tests/test_ystore.py b/tests/test_ystore.py index 39208bd..42cc207 100644 --- a/tests/test_ystore.py +++ b/tests/test_ystore.py @@ -1,5 +1,3 @@ -import os -import tempfile import time from pathlib import Path from unittest.mock import patch @@ -7,7 +5,7 @@ import aiosqlite import pytest -from ypy_websocket.ystore import SQLiteYStore, TempFileYStore +from ypy_websocket.stores import SQLiteYStore, TempFileYStore class MetadataCallback: @@ -24,35 +22,32 @@ class MyTempFileYStore(TempFileYStore): prefix_dir = "test_temp_" -MY_SQLITE_YSTORE_DB_PATH = str(Path(tempfile.mkdtemp(prefix="test_sql_")) / "ystore.db") - - class MySQLiteYStore(SQLiteYStore): - db_path = MY_SQLITE_YSTORE_DB_PATH document_ttl = 1000 - def __init__(self, *args, delete_db=False, **kwargs): - if delete_db: - os.remove(self.db_path) - super().__init__(*args, **kwargs) - @pytest.mark.anyio @pytest.mark.parametrize("YStore", (MyTempFileYStore, MySQLiteYStore)) -async def test_ystore(YStore): - store_name = "my_store" - ystore = YStore(store_name, metadata_callback=MetadataCallback()) +async def test_ystore(tmp_path, YStore): + store_path = tmp_path / "my_store" + doc_name = "my_doc.txt" + + ystore = YStore(str(store_path), metadata_callback=MetadataCallback()) await ystore.start() + await ystore.initialize() + + await ystore.create(doc_name, 0) + data = [b"foo", b"bar", b"baz"] for d in data: - await ystore.write(d) + await ystore.write(doc_name, d) if YStore == MyTempFileYStore: - assert (Path(MyTempFileYStore.base_dir) / store_name).exists() + assert (Path(store_path) / (doc_name + ".y")).exists() elif YStore == MySQLiteYStore: - assert Path(MySQLiteYStore.db_path).exists() + assert Path(store_path).exists() i = 0 - async for d, m, t in ystore.read(): + async for d, m, t in ystore.read(doc_name): assert d == data[i] # data assert m == str(i).encode() # metadata i += 1 @@ -61,18 +56,24 @@ async def test_ystore(YStore): @pytest.mark.anyio -async def test_document_ttl_sqlite_ystore(test_ydoc): - store_name = "my_store" - ystore = MySQLiteYStore(store_name, delete_db=True) +async def test_document_ttl_sqlite_ystore(tmp_path, test_ydoc): + store_path = tmp_path / "my_store.db" + doc_name = "my_doc.txt" + + ystore = MySQLiteYStore(str(store_path)) await ystore.start() + await ystore.initialize() + + await ystore.create(doc_name, 0) + now = time.time() for i in range(3): # assert that adding a record before document TTL doesn't delete document history with patch("time.time") as mock_time: mock_time.return_value = now - await ystore.write(test_ydoc.update()) - async with aiosqlite.connect(ystore.db_path) as db: + await ystore.write(doc_name, test_ydoc.update()) + async with aiosqlite.connect(store_path) as db: assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[ 0 ] == i + 1 @@ -80,20 +81,7 @@ async def test_document_ttl_sqlite_ystore(test_ydoc): # assert that adding a record after document TTL deletes previous document history with patch("time.time") as mock_time: mock_time.return_value = now + ystore.document_ttl + 1 - await ystore.write(test_ydoc.update()) - async with aiosqlite.connect(ystore.db_path) as db: + await ystore.write(doc_name, test_ydoc.update()) + async with aiosqlite.connect(store_path) as db: # two updates in DB: one squashed update and the new update assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 2 - - -@pytest.mark.anyio -@pytest.mark.parametrize("YStore", (MyTempFileYStore, MySQLiteYStore)) -async def test_version(YStore, caplog): - store_name = "my_store" - prev_version = YStore.version - YStore.version = -1 - ystore = YStore(store_name) - await ystore.start() - await ystore.write(b"foo") - YStore.version = prev_version - assert "YStore version mismatch" in caplog.text diff --git a/ypy_websocket/stores/__init__.py b/ypy_websocket/stores/__init__.py index 0541cf0..7666179 100644 --- a/ypy_websocket/stores/__init__.py +++ b/ypy_websocket/stores/__init__.py @@ -1,4 +1,4 @@ from .base_store import BaseYStore # noqa from .file_store import FileYStore, TempFileYStore # noqa from .sqlite_store import SQLiteYStore # noqa -from .utils import YDocNotFound, DocExists # noqa +from .utils import DocExists, YDocNotFound # noqa diff --git a/ypy_websocket/stores/base_store.py b/ypy_websocket/stores/base_store.py index 25a6c29..06c48ab 100644 --- a/ypy_websocket/stores/base_store.py +++ b/ypy_websocket/stores/base_store.py @@ -5,11 +5,10 @@ from inspect import isawaitable from typing import AsyncIterator, Awaitable, Callable, cast +import y_py as Y from anyio import TASK_STATUS_IGNORED, Event, create_task_group from anyio.abc import TaskGroup, TaskStatus -import y_py as Y - class BaseYStore(ABC): """ @@ -18,7 +17,7 @@ class BaseYStore(ABC): version = 2 metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None - + _store_path: str _initialized: Event | None = None _started: Event | None = None @@ -55,14 +54,14 @@ async def exists(self, path: str) -> bool: path: The document name/path. """ ... - + @abstractmethod async def list(self) -> AsyncIterator[str]: """ Returns a list with the name/path of the documents stored. """ ... - + @abstractmethod async def get(self, path: str) -> dict | None: """ @@ -72,7 +71,7 @@ async def get(self, path: str) -> dict | None: path: The document name/path. """ ... - + @abstractmethod async def create(self, path: str, version: int) -> None: """ @@ -83,7 +82,7 @@ async def create(self, path: str, version: int) -> None: version: Document version. """ ... - + @abstractmethod async def remove(self, path: str) -> dict | None: """ @@ -93,7 +92,7 @@ async def remove(self, path: str) -> dict | None: path: The document name/path. """ ... - + @abstractmethod async def write(self, path: str, data: bytes) -> None: """ @@ -122,9 +121,9 @@ async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes]]: def initialized(self) -> bool: if self._initialized is not None: return self._initialized.is_set() - else : + else: return False - + @property def started(self) -> Event: if self._started is None: diff --git a/ypy_websocket/stores/file_store.py b/ypy_websocket/stores/file_store.py index 6572944..84a651b 100644 --- a/ypy_websocket/stores/file_store.py +++ b/ypy_websocket/stores/file_store.py @@ -1,31 +1,30 @@ from __future__ import annotations -import time import struct import tempfile +import time from logging import Logger, getLogger from pathlib import Path from typing import AsyncIterator, Awaitable, Callable import anyio -from anyio import Lock +from anyio import Event, Lock +from deprecated import deprecated -import y_py as Y - -from .yutils import Decoder, get_new_path, write_var_uint +from ..yutils import Decoder, get_new_path, write_var_uint from .base_store import BaseYStore -from .utils import YDocNotFound +from .utils import DocExists, YDocNotFound + class FileYStore(BaseYStore): """A YStore which uses one file per document.""" - path: str + _lock: Lock metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None - lock: Lock def __init__( self, - path: str, + path: str = "./ystore", metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None, log: Logger | None = None, ) -> None: @@ -36,56 +35,152 @@ def __init__( metadata_callback: An optional callback to call to get the metadata. log: An optional logger. """ - self.path = path + self._lock = Lock() + self._store_path = path self.metadata_callback = metadata_callback self.log = log or getLogger(__name__) - self.lock = Lock() - async def check_version(self) -> int: - """Check the version of the store format. + async def initialize(self) -> None: + """ + Initializes the store. + """ + if self.initialized or self._initialized is not None: + return + self._initialized = Event() - Returns: - The offset where the data is located in the file. + version_path = Path(self._store_path, "__version__") + if not await anyio.Path(self._store_path).exists(): + await anyio.Path(self._store_path).mkdir(parents=True, exist_ok=True) + + version = -1 + create_version = False + if await anyio.Path(version_path).exists(): + async with await anyio.open_file(version_path, "rb") as f: + version = int(await f.readline()) + + # Store version mismatch. Move store and create a new one. + if self.version != version: + create_version = True + + if create_version: + new_path = await get_new_path(self._store_path) + self.log.warning( + f"YStore version mismatch, moving {self._store_path} to {new_path}" + ) + await anyio.Path(self._store_path).rename(new_path) + await anyio.Path(self._store_path).mkdir(parents=True, exist_ok=True) + + else: + create_version = True + + if create_version: + async with await anyio.open_file(version_path, "wb") as f: + version_bytes = str(self.version).encode() + await f.write(version_bytes) + + self._initialized.set() + + async def exists(self, path: str) -> bool: + """ + Returns True if the document exists, else returns False. + + Arguments: + path: The document name/path. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + return await anyio.Path(self._get_document_path(path)).exists() + + async def list(self) -> AsyncIterator[str]: + """ + Returns a list with the name/path of the documents stored. """ - if not await anyio.Path(self.path).exists(): - version_mismatch = True + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async for child in anyio.Path(self._store_path).glob("**/*.y"): + yield str(child.relative_to(self._store_path)) + + async def get(self, path: str) -> dict | None: + """ + Returns the document's metadata or None if the document does't exist. + + Arguments: + path: The document name/path. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + file_path = self._get_document_path(path) + if not await anyio.Path(file_path).exists(): + return None else: - version_mismatch = False - move_file = False - async with await anyio.open_file(self.path, "rb") as f: + version = None + async with await anyio.open_file(file_path, "rb") as f: header = await f.read(8) if header == b"VERSION:": version = int(await f.readline()) - if version == self.version: - offset = await f.tell() - else: - version_mismatch = True - else: - version_mismatch = True - if version_mismatch: - move_file = True - if move_file: - new_path = await get_new_path(self.path) - self.log.warning(f"YStore version mismatch, moving {self.path} to {new_path}") - await anyio.Path(self.path).rename(new_path) - if version_mismatch: - async with await anyio.open_file(self.path, "wb") as f: - version_bytes = f"VERSION:{self.version}\n".encode() + + return dict(path=path, version=version) + + async def create(self, path: str, version: int) -> None: + """ + Creates a new document. + + Arguments: + path: The document name/path. + version: Document version. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + file_path = self._get_document_path(path) + if await anyio.Path(file_path).exists(): + raise DocExists(f"The document {path} already exists.") + + else: + await anyio.Path(file_path.parent).mkdir(parents=True, exist_ok=True) + async with await anyio.open_file(file_path, "wb") as f: + version_bytes = f"VERSION:{version}\n".encode() await f.write(version_bytes) - offset = len(version_bytes) - return offset - async def read(self) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: ignore + async def remove(self, path: str) -> None: + """ + Removes a document. + + Arguments: + path: The document name/path. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + file_path = self._get_document_path(path) + if await anyio.Path(file_path).exists(): + await anyio.Path(file_path).unlink(missing_ok=False) + + async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: ignore """Async iterator for reading the store content. Returns: A tuple of (update, metadata, timestamp) for each update. """ - async with self.lock: - if not await anyio.Path(self.path).exists(): + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + file_path = self._get_document_path(path) + if not await anyio.Path(file_path).exists(): raise YDocNotFound - offset = await self.check_version() - async with await anyio.open_file(self.path, "rb") as f: + + offset = await self._get_data_offset(file_path) + async with await anyio.open_file(file_path, "rb") as f: await f.seek(offset) data = await f.read() if not data: @@ -101,17 +196,18 @@ async def read(self) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: igno yield update, metadata, timestamp i = (i + 1) % 3 - async def write(self, data: bytes) -> None: + async def write(self, path: str, data: bytes) -> None: """Store an update. Arguments: data: The update to store. """ - parent = Path(self.path).parent - async with self.lock: - await anyio.Path(parent).mkdir(parents=True, exist_ok=True) - await self.check_version() - async with await anyio.open_file(self.path, "ab") as f: + async with self._lock: + file_path = self._get_document_path(path) + if not await anyio.Path(file_path).exists(): + raise YDocNotFound + + async with await anyio.open_file(file_path, "ab") as f: data_len = write_var_uint(len(data)) await f.write(data_len + data) metadata = await self.get_metadata() @@ -121,7 +217,24 @@ async def write(self, data: bytes) -> None: timestamp_len = write_var_uint(len(timestamp)) await f.write(timestamp_len + timestamp) + async def _get_data_offset(self, path: Path) -> int: + try: + async with await anyio.open_file(path, "rb") as f: + header = await f.read(8) + if header == b"VERSION:": + await f.readline() + return await f.tell() + else: + raise Exception + + except Exception: + raise YDocNotFound(f"File {str(path)} not found.") + def _get_document_path(self, path: str) -> Path: + return Path(self._store_path, path + ".y") + + +@deprecated class TempFileYStore(FileYStore): """ A YStore which uses the system's temporary directory. @@ -132,8 +245,17 @@ class TempFileYStore(FileYStore): class PrefixTempFileYStore(TempFileYStore): prefix_dir = "my_prefix_" ``` + + ## Note: + This class is deprecated. Use FileYStore and pass the tmp folder + as path argument. For example: + + ```py + tmp_dir = tempfile.mkdtemp(prefix="prefix/directory/") + store = FileYStore(tmp_dir) + ``` """ - + prefix_dir: str | None = None base_dir: str | None = None diff --git a/ypy_websocket/stores/sqlite_store.py b/ypy_websocket/stores/sqlite_store.py index 06cad17..73e098a 100644 --- a/ypy_websocket/stores/sqlite_store.py +++ b/ypy_websocket/stores/sqlite_store.py @@ -1,20 +1,19 @@ from __future__ import annotations import time -import aiosqlite from logging import Logger, getLogger from typing import AsyncIterator, Awaitable, Callable - +import aiosqlite import anyio +import y_py as Y from anyio import TASK_STATUS_IGNORED, Event, Lock, create_task_group from anyio.abc import TaskStatus -import y_py as Y - from ..yutils import get_new_path from .base_store import BaseYStore -from .utils import YDocNotFound, DocExists +from .utils import DocExists, YDocNotFound + class SQLiteYStore(BaseYStore): """A YStore which uses an SQLite database. @@ -77,7 +76,7 @@ async def initialize(self) -> None: """ if self.initialized or self._initialized is not None: return - self._initialized = Event() + self._initialized = Event() async with self._lock: if await anyio.Path(self._store_path).exists(): @@ -85,23 +84,31 @@ async def initialize(self) -> None: async with aiosqlite.connect(self._store_path) as db: cursor = await db.execute("pragma user_version") version = (await cursor.fetchone())[0] - + # The DB has an old version. Move the database. if self.version != version: new_path = await get_new_path(self._store_path) - self.log.warning(f"YStore version mismatch, moving {self._store_path} to {new_path}") + self.log.warning( + f"YStore version mismatch, moving {self._store_path} to {new_path}" + ) await anyio.Path(self._store_path).rename(new_path) # Make sure every table exists. async with aiosqlite.connect(self._store_path) as db: - await db.execute("CREATE TABLE IF NOT EXISTS documents (path TEXT PRIMARY KEY, version INTEGER NOT NULL)") - await db.execute("CREATE TABLE IF NOT EXISTS yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)") - await db.execute("CREATE INDEX IF NOT EXISTS idx_yupdates_path_timestamp ON yupdates (path, timestamp)") + await db.execute( + "CREATE TABLE IF NOT EXISTS documents (path TEXT PRIMARY KEY, version INTEGER NOT NULL)" + ) + await db.execute( + "CREATE TABLE IF NOT EXISTS yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)" + ) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_yupdates_path_timestamp ON yupdates (path, timestamp)" + ) await db.execute(f"PRAGMA user_version = {self.version}") await db.commit() - + self._initialized.set() - + async def exists(self, path: str) -> bool: """ Returns True if the document exists, else returns False. @@ -115,9 +122,12 @@ async def exists(self, path: str) -> bool: async with self._lock: async with aiosqlite.connect(self._store_path) as db: - cursor = await db.execute("SELECT path, version FROM documents WHERE path = ?", (path,),) + cursor = await db.execute( + "SELECT path, version FROM documents WHERE path = ?", + (path,), + ) return (await cursor.fetchone()) is not None - + async def list(self) -> AsyncIterator[str]: """ Returns a list with the name/path of the documents stored. @@ -131,7 +141,7 @@ async def list(self) -> AsyncIterator[str]: async with db.execute("SELECT path FROM documents") as cursor: async for path in cursor: yield path[0] - + async def get(self, path: str) -> dict | None: """ Returns the document's metadata or None if the document does't exist. @@ -145,14 +155,17 @@ async def get(self, path: str) -> dict | None: async with self._lock: async with aiosqlite.connect(self._store_path) as db: - cursor = await db.execute("SELECT path, version FROM documents WHERE path = ?", (path,),) + cursor = await db.execute( + "SELECT path, version FROM documents WHERE path = ?", + (path,), + ) doc = await cursor.fetchone() if doc is None: return None - else : + else: return dict(path=doc[0], version=doc[1]) - + async def create(self, path: str, version: int) -> None: """ Creates a new document. @@ -168,11 +181,14 @@ async def create(self, path: str, version: int) -> None: async with self._lock: try: async with aiosqlite.connect(self._store_path) as db: - await db.execute("INSERT INTO documents VALUES (?, ?)", (path, version),) + await db.execute( + "INSERT INTO documents VALUES (?, ?)", + (path, version), + ) await db.commit() except aiosqlite.IntegrityError: raise DocExists(f"The document {path} already exists.") - + async def remove(self, path: str) -> None: """ Removes a document. @@ -186,8 +202,14 @@ async def remove(self, path: str) -> None: async with self._lock: async with aiosqlite.connect(self._store_path) as db: - await db.execute("DELETE FROM documents WHERE path = ?", (path,),) - await db.execute("DELETE FROM yupdates WHERE path = ?", (path,),) + await db.execute( + "DELETE FROM documents WHERE path = ?", + (path,), + ) + await db.execute( + "DELETE FROM yupdates WHERE path = ?", + (path,), + ) await db.commit() async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: ignore diff --git a/ypy_websocket/stores/utils.py b/ypy_websocket/stores/utils.py index 3415e55..a73240f 100644 --- a/ypy_websocket/stores/utils.py +++ b/ypy_websocket/stores/utils.py @@ -1,6 +1,6 @@ - class YDocNotFound(Exception): pass + class DocExists(Exception): pass diff --git a/ypy_websocket/yroom.py b/ypy_websocket/yroom.py index ad8deb7..a406ce3 100644 --- a/ypy_websocket/yroom.py +++ b/ypy_websocket/yroom.py @@ -17,8 +17,8 @@ from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from .awareness import Awareness -from .websocket import Websocket from .stores import BaseYStore +from .websocket import Websocket from .yutils import ( YMessageType, create_update_message, From 57b2d70fd0c580da9a1c71a836b5d474ffc195ad Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Wed, 4 Oct 2023 15:44:15 +0200 Subject: [PATCH 06/17] Include updates when requesting a document --- ypy_websocket/stores/file_store.py | 36 ++++++++++++++++++---------- ypy_websocket/stores/sqlite_store.py | 7 ++++-- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/ypy_websocket/stores/file_store.py b/ypy_websocket/stores/file_store.py index 84a651b..4f36f61 100644 --- a/ypy_websocket/stores/file_store.py +++ b/ypy_websocket/stores/file_store.py @@ -106,7 +106,7 @@ async def list(self) -> AsyncIterator[str]: async def get(self, path: str) -> dict | None: """ - Returns the document's metadata or None if the document does't exist. + Returns the document's metadata and updates or None if the document does't exist. Arguments: path: The document name/path. @@ -124,8 +124,13 @@ async def get(self, path: str) -> dict | None: header = await f.read(8) if header == b"VERSION:": version = int(await f.readline()) - - return dict(path=path, version=version) + + data = await f.read() + updates = [] + async for update, metadata, timestamp in self._decode_data(data): + updates.append((update, metadata, timestamp)) + + return dict(path=path, version=version, updates=updates) async def create(self, path: str, version: int) -> None: """ @@ -185,16 +190,9 @@ async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes, float]]: # data = await f.read() if not data: raise YDocNotFound - i = 0 - for d in Decoder(data).read_messages(): - if i == 0: - update = d - elif i == 1: - metadata = d - else: - timestamp = struct.unpack(" None: """Store an update. @@ -229,6 +227,18 @@ async def _get_data_offset(self, path: Path) -> int: except Exception: raise YDocNotFound(f"File {str(path)} not found.") + + async def _decode_data(self, data) -> AsyncIterator[tuple[bytes, bytes, float]]: + i = 0 + for d in Decoder(data).read_messages(): + if i == 0: + update = d + elif i == 1: + metadata = d + else: + timestamp = struct.unpack(" Path: return Path(self._store_path, path + ".y") diff --git a/ypy_websocket/stores/sqlite_store.py b/ypy_websocket/stores/sqlite_store.py index 73e098a..ae30d73 100644 --- a/ypy_websocket/stores/sqlite_store.py +++ b/ypy_websocket/stores/sqlite_store.py @@ -144,7 +144,7 @@ async def list(self) -> AsyncIterator[str]: async def get(self, path: str) -> dict | None: """ - Returns the document's metadata or None if the document does't exist. + Returns the document's metadata and updates or None if the document does't exist. Arguments: path: The document name/path. @@ -163,8 +163,11 @@ async def get(self, path: str) -> dict | None: if doc is None: return None + else: - return dict(path=doc[0], version=doc[1]) + cursor = await db.execute("SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?", (path,),) + updates = await cursor.fetchall() + return dict(path=doc[0], version=doc[1], updates=updates) async def create(self, path: str, version: int) -> None: """ From 2eb2f177fab9f9304e5def1ef42f28a2af02c1f4 Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Wed, 4 Oct 2023 15:59:26 +0200 Subject: [PATCH 07/17] Fixes type errors --- pyproject.toml | 1 + tests/test_file_store.py | 2 +- ypy_websocket/stores/sqlite_store.py | 4 +++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f186505..4afd895 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ test = [ "pytest-asyncio", "websockets >=10.0", "uvicorn", + "types-Deprecated" ] docs = [ "mkdocs", diff --git a/tests/test_file_store.py b/tests/test_file_store.py index 475bcf9..1f4709e 100644 --- a/tests/test_file_store.py +++ b/tests/test_file_store.py @@ -24,7 +24,7 @@ async def _inner(path: str, version: int) -> None: @pytest.fixture def add_document(): async def _inner(path: str, doc_path: str, version: int, data: bytes = b"") -> None: - file_path = Path(path / (doc_path + ".y")) + file_path = Path(path, (doc_path + ".y")) await anyio.Path(file_path.parent).mkdir(parents=True, exist_ok=True) async with await anyio.open_file(file_path, "ab") as f: diff --git a/ypy_websocket/stores/sqlite_store.py b/ypy_websocket/stores/sqlite_store.py index ae30d73..93ea254 100644 --- a/ypy_websocket/stores/sqlite_store.py +++ b/ypy_websocket/stores/sqlite_store.py @@ -83,7 +83,9 @@ async def initialize(self) -> None: version = -1 async with aiosqlite.connect(self._store_path) as db: cursor = await db.execute("pragma user_version") - version = (await cursor.fetchone())[0] + row = await cursor.fetchone() + if row is not None: + version = row[0] # The DB has an old version. Move the database. if self.version != version: From ecb8680c53ff3ec283ccd9f5875df2e2ecf89726 Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Thu, 5 Oct 2023 15:52:44 +0200 Subject: [PATCH 08/17] Adds a flag to include the updates when retrieving a doc --- ypy_websocket/stores/base_store.py | 2 +- ypy_websocket/stores/file_store.py | 21 +++++++++++---------- ypy_websocket/stores/sqlite_store.py | 17 +++++++++++------ 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/ypy_websocket/stores/base_store.py b/ypy_websocket/stores/base_store.py index 06c48ab..b447870 100644 --- a/ypy_websocket/stores/base_store.py +++ b/ypy_websocket/stores/base_store.py @@ -63,7 +63,7 @@ async def list(self) -> AsyncIterator[str]: ... @abstractmethod - async def get(self, path: str) -> dict | None: + async def get(self, path: str, updates: bool = False) -> dict | None: """ Returns the document's metadata or None if the document does't exist. diff --git a/ypy_websocket/stores/file_store.py b/ypy_websocket/stores/file_store.py index 4f36f61..4667197 100644 --- a/ypy_websocket/stores/file_store.py +++ b/ypy_websocket/stores/file_store.py @@ -104,7 +104,7 @@ async def list(self) -> AsyncIterator[str]: async for child in anyio.Path(self._store_path).glob("**/*.y"): yield str(child.relative_to(self._store_path)) - async def get(self, path: str) -> dict | None: + async def get(self, path: str, updates: bool = False) -> dict | None: """ Returns the document's metadata and updates or None if the document does't exist. @@ -124,13 +124,14 @@ async def get(self, path: str) -> dict | None: header = await f.read(8) if header == b"VERSION:": version = int(await f.readline()) - - data = await f.read() - updates = [] - async for update, metadata, timestamp in self._decode_data(data): - updates.append((update, metadata, timestamp)) - - return dict(path=path, version=version, updates=updates) + + list_updates = [] + if updates: + data = await f.read() + async for update, metadata, timestamp in self._decode_data(data): + list_updates.append((update, metadata, timestamp)) + + return dict(path=path, version=version, updates=list_updates) async def create(self, path: str, version: int) -> None: """ @@ -190,7 +191,7 @@ async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes, float]]: # data = await f.read() if not data: raise YDocNotFound - + async for res in self._decode_data(data): yield res @@ -227,7 +228,7 @@ async def _get_data_offset(self, path: Path) -> int: except Exception: raise YDocNotFound(f"File {str(path)} not found.") - + async def _decode_data(self, data) -> AsyncIterator[tuple[bytes, bytes, float]]: i = 0 for d in Decoder(data).read_messages(): diff --git a/ypy_websocket/stores/sqlite_store.py b/ypy_websocket/stores/sqlite_store.py index 93ea254..8482083 100644 --- a/ypy_websocket/stores/sqlite_store.py +++ b/ypy_websocket/stores/sqlite_store.py @@ -144,7 +144,7 @@ async def list(self) -> AsyncIterator[str]: async for path in cursor: yield path[0] - async def get(self, path: str) -> dict | None: + async def get(self, path: str, updates: bool = False) -> dict | None: """ Returns the document's metadata and updates or None if the document does't exist. @@ -165,11 +165,16 @@ async def get(self, path: str) -> dict | None: if doc is None: return None - - else: - cursor = await db.execute("SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?", (path,),) - updates = await cursor.fetchall() - return dict(path=doc[0], version=doc[1], updates=updates) + + list_updates = [] + if updates: + cursor = await db.execute( + "SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?", + (path,), + ) + list_updates = await cursor.fetchall() + + return dict(path=doc[0], version=doc[1], updates=list_updates) async def create(self, path: str, version: int) -> None: """ From 6f84e6ff0aeccc762724bf373ddfe8bedb6869cb Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Thu, 5 Oct 2023 15:59:26 +0200 Subject: [PATCH 09/17] Update version --- ypy_websocket/stores/base_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ypy_websocket/stores/base_store.py b/ypy_websocket/stores/base_store.py index b447870..4c6449d 100644 --- a/ypy_websocket/stores/base_store.py +++ b/ypy_websocket/stores/base_store.py @@ -15,7 +15,7 @@ class BaseYStore(ABC): Base class for the stores. """ - version = 2 + version = 3 metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None _store_path: str From 32c90633f09b486b110e50dcf27a5c70bdbb2cca Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Thu, 5 Oct 2023 16:21:21 +0200 Subject: [PATCH 10/17] Ignore mypy error --- ypy_websocket/stores/file_store.py | 2 +- ypy_websocket/stores/sqlite_store.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ypy_websocket/stores/file_store.py b/ypy_websocket/stores/file_store.py index 4667197..e2b8e06 100644 --- a/ypy_websocket/stores/file_store.py +++ b/ypy_websocket/stores/file_store.py @@ -93,7 +93,7 @@ async def exists(self, path: str) -> bool: return await anyio.Path(self._get_document_path(path)).exists() - async def list(self) -> AsyncIterator[str]: + async def list(self) -> AsyncIterator[str]: # type: ignore[override] """ Returns a list with the name/path of the documents stored. """ diff --git a/ypy_websocket/stores/sqlite_store.py b/ypy_websocket/stores/sqlite_store.py index 8482083..db0f847 100644 --- a/ypy_websocket/stores/sqlite_store.py +++ b/ypy_websocket/stores/sqlite_store.py @@ -2,7 +2,7 @@ import time from logging import Logger, getLogger -from typing import AsyncIterator, Awaitable, Callable +from typing import Any, AsyncIterator, Awaitable, Callable import aiosqlite import anyio @@ -130,7 +130,7 @@ async def exists(self, path: str) -> bool: ) return (await cursor.fetchone()) is not None - async def list(self) -> AsyncIterator[str]: + async def list(self) -> AsyncIterator[str]: # type: ignore[override] """ Returns a list with the name/path of the documents stored. """ @@ -166,7 +166,7 @@ async def get(self, path: str, updates: bool = False) -> dict | None: if doc is None: return None - list_updates = [] + list_updates: Any = [] if updates: cursor = await db.execute( "SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?", From 2515a79c487c574bdb57e757e14d23bf732c20f7 Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Fri, 6 Oct 2023 10:05:24 +0200 Subject: [PATCH 11/17] Review --- ypy_websocket/stores/file_store.py | 2 +- ypy_websocket/stores/sqlite_store.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ypy_websocket/stores/file_store.py b/ypy_websocket/stores/file_store.py index e2b8e06..3a2b76b 100644 --- a/ypy_websocket/stores/file_store.py +++ b/ypy_websocket/stores/file_store.py @@ -125,7 +125,7 @@ async def get(self, path: str, updates: bool = False) -> dict | None: if header == b"VERSION:": version = int(await f.readline()) - list_updates = [] + list_updates: list[tuple[bytes, bytes, float]] = [] if updates: data = await f.read() async for update, metadata, timestamp in self._decode_data(data): diff --git a/ypy_websocket/stores/sqlite_store.py b/ypy_websocket/stores/sqlite_store.py index db0f847..3d5bd64 100644 --- a/ypy_websocket/stores/sqlite_store.py +++ b/ypy_websocket/stores/sqlite_store.py @@ -2,7 +2,7 @@ import time from logging import Logger, getLogger -from typing import Any, AsyncIterator, Awaitable, Callable +from typing import AsyncIterator, Awaitable, Callable import aiosqlite import anyio @@ -166,7 +166,7 @@ async def get(self, path: str, updates: bool = False) -> dict | None: if doc is None: return None - list_updates: Any = [] + list_updates: list[tuple[bytes, bytes, float]] = [] if updates: cursor = await db.execute( "SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?", From 746a62b871329b461571f5e1a19cb138e77d3363 Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Fri, 6 Oct 2023 10:29:07 +0200 Subject: [PATCH 12/17] Fixes types --- ypy_websocket/stores/sqlite_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ypy_websocket/stores/sqlite_store.py b/ypy_websocket/stores/sqlite_store.py index 3d5bd64..251542a 100644 --- a/ypy_websocket/stores/sqlite_store.py +++ b/ypy_websocket/stores/sqlite_store.py @@ -2,7 +2,7 @@ import time from logging import Logger, getLogger -from typing import AsyncIterator, Awaitable, Callable +from typing import Any, AsyncIterator, Awaitable, Callable, Iterable import aiosqlite import anyio @@ -166,7 +166,7 @@ async def get(self, path: str, updates: bool = False) -> dict | None: if doc is None: return None - list_updates: list[tuple[bytes, bytes, float]] = [] + list_updates: Iterable[Any] = [] if updates: cursor = await db.execute( "SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?", From abe39d900ed9fdf82f9e62de8a2808e1db690cbc Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Fri, 6 Oct 2023 15:50:25 +0200 Subject: [PATCH 13/17] Review --- ypy_websocket/stores/base_store.py | 1 + ypy_websocket/stores/file_store.py | 1 + ypy_websocket/stores/sqlite_store.py | 1 + 3 files changed, 3 insertions(+) diff --git a/ypy_websocket/stores/base_store.py b/ypy_websocket/stores/base_store.py index 4c6449d..fe9f90a 100644 --- a/ypy_websocket/stores/base_store.py +++ b/ypy_websocket/stores/base_store.py @@ -69,6 +69,7 @@ async def get(self, path: str, updates: bool = False) -> dict | None: Arguments: path: The document name/path. + updates: Whether to return document's content or only the metadata. """ ... diff --git a/ypy_websocket/stores/file_store.py b/ypy_websocket/stores/file_store.py index 3a2b76b..40d415e 100644 --- a/ypy_websocket/stores/file_store.py +++ b/ypy_websocket/stores/file_store.py @@ -110,6 +110,7 @@ async def get(self, path: str, updates: bool = False) -> dict | None: Arguments: path: The document name/path. + updates: Whether to return document's content or only the metadata. """ if self._initialized is None: raise Exception("The store was not initialized.") diff --git a/ypy_websocket/stores/sqlite_store.py b/ypy_websocket/stores/sqlite_store.py index 251542a..90b3393 100644 --- a/ypy_websocket/stores/sqlite_store.py +++ b/ypy_websocket/stores/sqlite_store.py @@ -150,6 +150,7 @@ async def get(self, path: str, updates: bool = False) -> dict | None: Arguments: path: The document name/path. + updates: Whether to return document's content or only the metadata. """ if self._initialized is None: raise Exception("The store was not initialized.") From 50e03b4a56d02ce69b1d2e5e01dcac6827415237 Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Mon, 9 Oct 2023 18:09:22 +0200 Subject: [PATCH 14/17] Review --- tests/test_file_store.py | 57 ++++++----- tests/test_sqlite_store.py | 146 ++++++++++----------------- ypy_websocket/stores/__init__.py | 2 +- ypy_websocket/stores/base_store.py | 5 +- ypy_websocket/stores/file_store.py | 10 +- ypy_websocket/stores/sqlite_store.py | 11 +- ypy_websocket/stores/utils.py | 2 +- 7 files changed, 105 insertions(+), 128 deletions(-) diff --git a/tests/test_file_store.py b/tests/test_file_store.py index 1f4709e..defe8bf 100644 --- a/tests/test_file_store.py +++ b/tests/test_file_store.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import struct import time from pathlib import Path @@ -5,7 +7,7 @@ import anyio import pytest -from ypy_websocket.stores import DocExists, FileYStore +from ypy_websocket.stores import FileYStore, YDocExists, YDocNotFound from ypy_websocket.yutils import Decoder, write_var_uint @@ -23,21 +25,23 @@ async def _inner(path: str, version: int) -> None: @pytest.fixture def add_document(): - async def _inner(path: str, doc_path: str, version: int, data: bytes = b"") -> None: + async def _inner(path: str, doc_path: str, version: int, data: bytes | None = None) -> None: file_path = Path(path, (doc_path + ".y")) await anyio.Path(file_path.parent).mkdir(parents=True, exist_ok=True) async with await anyio.open_file(file_path, "ab") as f: version_bytes = f"VERSION:{version}\n".encode() await f.write(version_bytes) - data_len = write_var_uint(len(data)) - await f.write(data_len + data) - metadata = b"" - metadata_len = write_var_uint(len(metadata)) - await f.write(metadata_len + metadata) - timestamp = struct.pack(" None: @pytest.fixture def add_document(): - async def _inner(path: str, doc_path: str, version: int, data: bytes = b"") -> None: + async def _inner(path: str, doc_path: str, version: int, data: bytes | None = None) -> None: async with aiosqlite.connect(path) as db: await db.execute( "INSERT INTO documents VALUES (?, ?)", (doc_path, version), ) - await db.execute( - "INSERT INTO yupdates VALUES (?, ?, ?, ?)", - (doc_path, data, b"", time.time()), - ) + if data is not None: + await db.execute( + "INSERT INTO yupdates VALUES (?, ?, ?, ?)", + (doc_path, data, b"", time.time()), + ) await db.commit() return _inner @@ -52,22 +55,7 @@ async def test_initialization(tmp_path): assert store.initialized - async with aiosqlite.connect(path) as db: - cursor = await db.execute("pragma user_version") - version = (await cursor.fetchone())[0] - assert store.version == version - - cursor = await db.execute( - "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'" - ) - res = await cursor.fetchone() - assert res[0] == 1 - - cursor = await db.execute( - "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'" - ) - res = await cursor.fetchone() - assert res[0] == 1 + await _check_db(path, store) @pytest.mark.anyio @@ -83,29 +71,14 @@ async def test_initialization_with_old_database(tmp_path, create_database): assert store.initialized - async with aiosqlite.connect(path) as db: - cursor = await db.execute("pragma user_version") - version = (await cursor.fetchone())[0] - assert store.version == version - - cursor = await db.execute( - "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'" - ) - res = await cursor.fetchone() - assert res[0] == 1 - - cursor = await db.execute( - "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'" - ) - res = await cursor.fetchone() - assert res[0] == 1 + await _check_db(path, store) @pytest.mark.anyio async def test_initialization_with_empty_database(tmp_path, create_database): path = tmp_path / "tmp.db" - # Create a database with an old version + # Create a database await create_database(path, SQLiteYStore.version, False) store = SQLiteYStore(str(path)) @@ -114,22 +87,7 @@ async def test_initialization_with_empty_database(tmp_path, create_database): assert store.initialized - async with aiosqlite.connect(path) as db: - cursor = await db.execute("pragma user_version") - version = (await cursor.fetchone())[0] - assert store.version == version - - cursor = await db.execute( - "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'" - ) - res = await cursor.fetchone() - assert res[0] == 1 - - cursor = await db.execute( - "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'" - ) - res = await cursor.fetchone() - assert res[0] == 1 + await _check_db(path, store) @pytest.mark.anyio @@ -137,7 +95,7 @@ async def test_initialization_with_existing_database(tmp_path, create_database, path = tmp_path / "tmp.db" doc_path = "test.txt" - # Create a database with an old version + # Create a database await create_database(path, SQLiteYStore.version) await add_document(path, doc_path, 0) @@ -147,30 +105,7 @@ async def test_initialization_with_existing_database(tmp_path, create_database, assert store.initialized - async with aiosqlite.connect(path) as db: - cursor = await db.execute("pragma user_version") - version = (await cursor.fetchone())[0] - assert store.version == version - - cursor = await db.execute( - "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'" - ) - res = await cursor.fetchone() - assert res[0] == 1 - - cursor = await db.execute( - "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'" - ) - res = await cursor.fetchone() - assert res[0] == 1 - - cursor = await db.execute( - "SELECT path, version FROM documents WHERE path = ?", - (doc_path,), - ) - res = await cursor.fetchone() - assert res[0] == doc_path - assert res[1] == 0 + await _check_db(path, store, doc_path) @pytest.mark.anyio @@ -223,7 +158,7 @@ async def test_get(tmp_path, create_database, add_document): path = tmp_path / "tmp.db" doc_path = "test.txt" - # Create a database with an old version + # Create a database await create_database(path, SQLiteYStore.version) await add_document(path, doc_path, 0) @@ -246,7 +181,7 @@ async def test_create(tmp_path, create_database, add_document): path = tmp_path / "tmp.db" doc_path = "test.txt" - # Create a database with an old version + # Create a database await create_database(path, SQLiteYStore.version) await add_document(path, doc_path, 0) @@ -267,7 +202,7 @@ async def test_create(tmp_path, create_database, add_document): assert res[0] == new_doc assert res[1] == 0 - with pytest.raises(DocExists) as e: + with pytest.raises(YDocExists) as e: await store.create(doc_path, 0) assert str(e.value) == f"The document {doc_path} already exists." @@ -277,7 +212,7 @@ async def test_remove(tmp_path, create_database, add_document): path = tmp_path / "tmp.db" doc_path = "test.txt" - # Create a database with an old version + # Create a database await create_database(path, SQLiteYStore.version) await add_document(path, doc_path, 0) @@ -287,13 +222,15 @@ async def test_remove(tmp_path, create_database, add_document): assert store.initialized + assert await store.exists(doc_path) await store.remove(doc_path) assert not await store.exists(doc_path) new_doc = "new_doc.path" assert not await store.exists(new_doc) - - await store.remove(new_doc) + with pytest.raises(YDocNotFound) as e: + await store.remove(new_doc) + assert str(e.value) == f"The document {new_doc} doesn't exists." assert not await store.exists(new_doc) @@ -303,7 +240,7 @@ async def test_read(tmp_path, create_database, add_document): doc_path = "test.txt" update = b"foo" - # Create a database with an old version + # Create a database await create_database(path, SQLiteYStore.version) await add_document(path, doc_path, 0, update) @@ -326,7 +263,7 @@ async def test_write(tmp_path, create_database, add_document): path = tmp_path / "tmp.db" doc_path = "test.txt" - # Create a database with an old version + # Create a database await create_database(path, SQLiteYStore.version) await add_document(path, doc_path, 0) @@ -344,6 +281,33 @@ async def test_write(tmp_path, create_database, add_document): count = 0 async for u, in cursor: count += 1 - # The fixture add_document inserts an empty update - assert u in [b"", update] - assert count == 2 + assert u == update + assert count == 1 + + +async def _check_db(path: str, store: SQLiteYStore, doc_path: str | None = None): + async with aiosqlite.connect(path) as db: + cursor = await db.execute("pragma user_version") + version = (await cursor.fetchone())[0] + assert store.version == version + + cursor = await db.execute( + "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'" + ) + res = await cursor.fetchone() + assert res[0] == 1 + + cursor = await db.execute( + "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'" + ) + res = await cursor.fetchone() + assert res[0] == 1 + + if doc_path is not None: + cursor = await db.execute( + "SELECT path, version FROM documents WHERE path = ?", + (doc_path,), + ) + res = await cursor.fetchone() + assert res[0] == doc_path + assert res[1] == 0 diff --git a/ypy_websocket/stores/__init__.py b/ypy_websocket/stores/__init__.py index 7666179..4bbaf63 100644 --- a/ypy_websocket/stores/__init__.py +++ b/ypy_websocket/stores/__init__.py @@ -1,4 +1,4 @@ from .base_store import BaseYStore # noqa from .file_store import FileYStore, TempFileYStore # noqa from .sqlite_store import SQLiteYStore # noqa -from .utils import DocExists, YDocNotFound # noqa +from .utils import YDocExists, YDocNotFound # noqa diff --git a/ypy_websocket/stores/base_store.py b/ypy_websocket/stores/base_store.py index fe9f90a..99c4ffc 100644 --- a/ypy_websocket/stores/base_store.py +++ b/ypy_websocket/stores/base_store.py @@ -32,7 +32,7 @@ def __init__( Initialize the object. Arguments: - path: The path where the store will be located or the prefix for file-based stores. + path: The path where the store will be located. metadata_callback: An optional callback to call to get the metadata. log: An optional logger. """ @@ -122,8 +122,7 @@ async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes]]: def initialized(self) -> bool: if self._initialized is not None: return self._initialized.is_set() - else: - return False + return False @property def started(self) -> Event: diff --git a/ypy_websocket/stores/file_store.py b/ypy_websocket/stores/file_store.py index 40d415e..89452a0 100644 --- a/ypy_websocket/stores/file_store.py +++ b/ypy_websocket/stores/file_store.py @@ -13,7 +13,7 @@ from ..yutils import Decoder, get_new_path, write_var_uint from .base_store import BaseYStore -from .utils import DocExists, YDocNotFound +from .utils import YDocExists, YDocNotFound class FileYStore(BaseYStore): @@ -102,7 +102,7 @@ async def list(self) -> AsyncIterator[str]: # type: ignore[override] await self._initialized.wait() async for child in anyio.Path(self._store_path).glob("**/*.y"): - yield str(child.relative_to(self._store_path)) + yield str(child.relative_to(self._store_path).with_suffix("")) async def get(self, path: str, updates: bool = False) -> dict | None: """ @@ -148,7 +148,7 @@ async def create(self, path: str, version: int) -> None: file_path = self._get_document_path(path) if await anyio.Path(file_path).exists(): - raise DocExists(f"The document {path} already exists.") + raise YDocExists(f"The document {path} already exists.") else: await anyio.Path(file_path.parent).mkdir(parents=True, exist_ok=True) @@ -170,6 +170,8 @@ async def remove(self, path: str) -> None: file_path = self._get_document_path(path) if await anyio.Path(file_path).exists(): await anyio.Path(file_path).unlink(missing_ok=False) + else: + raise YDocNotFound(f"The document {path} doesn't exists.") async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: ignore """Async iterator for reading the store content. @@ -246,7 +248,7 @@ def _get_document_path(self, path: str) -> Path: return Path(self._store_path, path + ".y") -@deprecated +@deprecated(reason="Use FileYStore instead") class TempFileYStore(FileYStore): """ A YStore which uses the system's temporary directory. diff --git a/ypy_websocket/stores/sqlite_store.py b/ypy_websocket/stores/sqlite_store.py index 90b3393..151cd50 100644 --- a/ypy_websocket/stores/sqlite_store.py +++ b/ypy_websocket/stores/sqlite_store.py @@ -12,7 +12,7 @@ from ..yutils import get_new_path from .base_store import BaseYStore -from .utils import DocExists, YDocNotFound +from .utils import YDocExists, YDocNotFound class SQLiteYStore(BaseYStore): @@ -198,7 +198,7 @@ async def create(self, path: str, version: int) -> None: ) await db.commit() except aiosqlite.IntegrityError: - raise DocExists(f"The document {path} already exists.") + raise YDocExists(f"The document {path} already exists.") async def remove(self, path: str) -> None: """ @@ -213,6 +213,13 @@ async def remove(self, path: str) -> None: async with self._lock: async with aiosqlite.connect(self._store_path) as db: + cursor = await db.execute( + "SELECT path, version FROM documents WHERE path = ?", + (path,), + ) + if (await cursor.fetchone()) is None: + raise YDocNotFound(f"The document {path} doesn't exists.") + await db.execute( "DELETE FROM documents WHERE path = ?", (path,), diff --git a/ypy_websocket/stores/utils.py b/ypy_websocket/stores/utils.py index a73240f..6c5bf76 100644 --- a/ypy_websocket/stores/utils.py +++ b/ypy_websocket/stores/utils.py @@ -2,5 +2,5 @@ class YDocNotFound(Exception): pass -class DocExists(Exception): +class YDocExists(Exception): pass From dd72571b5a02e97bc710447fa19fe3f15996461a Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Tue, 10 Oct 2023 14:43:35 +0200 Subject: [PATCH 15/17] Removes start/stop --- tests/test_file_store.py | 10 ----- tests/test_sqlite_store.py | 21 ++++------ tests/test_ystore.py | 2 - ypy_websocket/stores/base_store.py | 59 +--------------------------- ypy_websocket/stores/sqlite_store.py | 19 --------- 5 files changed, 8 insertions(+), 103 deletions(-) diff --git a/tests/test_file_store.py b/tests/test_file_store.py index defe8bf..b65140b 100644 --- a/tests/test_file_store.py +++ b/tests/test_file_store.py @@ -50,7 +50,6 @@ async def _inner(path: str, doc_path: str, version: int, data: bytes | None = No async def test_initialization(tmp_path): path = tmp_path / "tmp" store = FileYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -69,7 +68,6 @@ async def test_initialization_with_old_store(tmp_path, create_store): await create_store(path, 1) store = FileYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -90,7 +88,6 @@ async def test_initialization_with_existing_store(tmp_path, create_store, add_do await add_document(path, doc_path, 0) store = FileYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -114,7 +111,6 @@ async def test_exists(tmp_path, create_store, add_document): await add_document(path, doc_path, 0) store = FileYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -136,7 +132,6 @@ async def test_list(tmp_path, create_store, add_document): await add_document(path, doc2, 0) store = FileYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -159,7 +154,6 @@ async def test_get(tmp_path, create_store, add_document): await add_document(path, doc_path, 0) store = FileYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -182,7 +176,6 @@ async def test_create(tmp_path, create_store, add_document): await add_document(path, doc_path, 0) store = FileYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -213,7 +206,6 @@ async def test_remove(tmp_path, create_store, add_document): await add_document(path, doc_path, 0) store = FileYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -241,7 +233,6 @@ async def test_read(tmp_path, create_store, add_document): await add_document(path, doc_path, 0, update) store = FileYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -264,7 +255,6 @@ async def test_write(tmp_path, create_store, add_document): await add_document(path, doc_path, 0) store = FileYStore(str(path)) - await store.start() await store.initialize() assert store.initialized diff --git a/tests/test_sqlite_store.py b/tests/test_sqlite_store.py index 2abb1c9..a527e9d 100644 --- a/tests/test_sqlite_store.py +++ b/tests/test_sqlite_store.py @@ -50,7 +50,6 @@ async def _inner(path: str, doc_path: str, version: int, data: bytes | None = No async def test_initialization(tmp_path): path = tmp_path / "tmp.db" store = SQLiteYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -66,7 +65,6 @@ async def test_initialization_with_old_database(tmp_path, create_database): await create_database(path, 1) store = SQLiteYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -82,7 +80,6 @@ async def test_initialization_with_empty_database(tmp_path, create_database): await create_database(path, SQLiteYStore.version, False) store = SQLiteYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -100,7 +97,6 @@ async def test_initialization_with_existing_database(tmp_path, create_database, await add_document(path, doc_path, 0) store = SQLiteYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -118,7 +114,6 @@ async def test_exists(tmp_path, create_database, add_document): await add_document(path, doc_path, 0) store = SQLiteYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -140,7 +135,6 @@ async def test_list(tmp_path, create_database, add_document): await add_document(path, doc2, 0) store = SQLiteYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -163,7 +157,6 @@ async def test_get(tmp_path, create_database, add_document): await add_document(path, doc_path, 0) store = SQLiteYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -186,7 +179,6 @@ async def test_create(tmp_path, create_database, add_document): await add_document(path, doc_path, 0) store = SQLiteYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -217,7 +209,6 @@ async def test_remove(tmp_path, create_database, add_document): await add_document(path, doc_path, 0) store = SQLiteYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -245,7 +236,6 @@ async def test_read(tmp_path, create_database, add_document): await add_document(path, doc_path, 0, update) store = SQLiteYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -268,7 +258,6 @@ async def test_write(tmp_path, create_database, add_document): await add_document(path, doc_path, 0) store = SQLiteYStore(str(path)) - await store.start() await store.initialize() assert store.initialized @@ -288,20 +277,23 @@ async def test_write(tmp_path, create_database, add_document): async def _check_db(path: str, store: SQLiteYStore, doc_path: str | None = None): async with aiosqlite.connect(path) as db: cursor = await db.execute("pragma user_version") - version = (await cursor.fetchone())[0] - assert store.version == version + res = await cursor.fetchone() + assert res + assert store.version == res[0] cursor = await db.execute( "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='documents'" ) res = await cursor.fetchone() + assert res assert res[0] == 1 cursor = await db.execute( "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='yupdates'" ) res = await cursor.fetchone() - assert res[0] == 1 + assert res + assert res and res[0] == 1 if doc_path is not None: cursor = await db.execute( @@ -309,5 +301,6 @@ async def _check_db(path: str, store: SQLiteYStore, doc_path: str | None = None) (doc_path,), ) res = await cursor.fetchone() + assert res assert res[0] == doc_path assert res[1] == 0 diff --git a/tests/test_ystore.py b/tests/test_ystore.py index 42cc207..d999e83 100644 --- a/tests/test_ystore.py +++ b/tests/test_ystore.py @@ -33,7 +33,6 @@ async def test_ystore(tmp_path, YStore): doc_name = "my_doc.txt" ystore = YStore(str(store_path), metadata_callback=MetadataCallback()) - await ystore.start() await ystore.initialize() await ystore.create(doc_name, 0) @@ -61,7 +60,6 @@ async def test_document_ttl_sqlite_ystore(tmp_path, test_ydoc): doc_name = "my_doc.txt" ystore = MySQLiteYStore(str(store_path)) - await ystore.start() await ystore.initialize() await ystore.create(doc_name, 0) diff --git a/ypy_websocket/stores/base_store.py b/ypy_websocket/stores/base_store.py index 99c4ffc..7943156 100644 --- a/ypy_websocket/stores/base_store.py +++ b/ypy_websocket/stores/base_store.py @@ -1,13 +1,11 @@ from __future__ import annotations from abc import ABC, abstractmethod -from contextlib import AsyncExitStack from inspect import isawaitable from typing import AsyncIterator, Awaitable, Callable, cast import y_py as Y -from anyio import TASK_STATUS_IGNORED, Event, create_task_group -from anyio.abc import TaskGroup, TaskStatus +from anyio import Event class BaseYStore(ABC): @@ -20,9 +18,6 @@ class BaseYStore(ABC): _store_path: str _initialized: Event | None = None - _started: Event | None = None - _starting: bool = False - _task_group: TaskGroup | None = None @abstractmethod def __init__( @@ -124,58 +119,6 @@ def initialized(self) -> bool: return self._initialized.is_set() return False - @property - def started(self) -> Event: - if self._started is None: - self._started = Event() - return self._started - - async def __aenter__(self) -> BaseYStore: - if self._task_group is not None: - raise RuntimeError("YStore already running") - - async with AsyncExitStack() as exit_stack: - tg = create_task_group() - self._task_group = await exit_stack.enter_async_context(tg) - self._exit_stack = exit_stack.pop_all() - tg.start_soon(self.start) - - return self - - async def __aexit__(self, exc_type, exc_value, exc_tb): - if self._task_group is None: - raise RuntimeError("YStore not running") - - self._task_group.cancel_scope.cancel() - self._task_group = None - return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb) - - async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED): - """Start the store. - - Arguments: - task_status: The status to set when the task has started. - """ - if self._starting: - return - else: - self._staring = True - - if self._task_group is not None: - raise RuntimeError("YStore already running") - - self.started.set() - self._starting = False - task_status.started() - - def stop(self) -> None: - """Stop the store.""" - if self._task_group is None: - raise RuntimeError("YStore not running") - - self._task_group.cancel_scope.cancel() - self._task_group = None - async def get_metadata(self) -> bytes: """ Returns: diff --git a/ypy_websocket/stores/sqlite_store.py b/ypy_websocket/stores/sqlite_store.py index 151cd50..2cdd219 100644 --- a/ypy_websocket/stores/sqlite_store.py +++ b/ypy_websocket/stores/sqlite_store.py @@ -51,25 +51,6 @@ def __init__( self.metadata_callback = metadata_callback self.log = log or getLogger(__name__) - async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED): - """Start the SQLiteYStore. - - Arguments: - task_status: The status to set when the task has started. - """ - if self._starting: - return - else: - self._starting = True - - if self._task_group is not None: - raise RuntimeError("YStore already running") - - self._task_group = create_task_group() - self.started.set() - self._starting = False - task_status.started() - async def initialize(self) -> None: """ Initializes the store. From e876a8014fadcb28e3382221ca6e772938eda3f6 Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Tue, 10 Oct 2023 14:46:17 +0200 Subject: [PATCH 16/17] pre-commit --- ypy_websocket/stores/sqlite_store.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ypy_websocket/stores/sqlite_store.py b/ypy_websocket/stores/sqlite_store.py index 2cdd219..f7d8375 100644 --- a/ypy_websocket/stores/sqlite_store.py +++ b/ypy_websocket/stores/sqlite_store.py @@ -7,8 +7,7 @@ import aiosqlite import anyio import y_py as Y -from anyio import TASK_STATUS_IGNORED, Event, Lock, create_task_group -from anyio.abc import TaskStatus +from anyio import Event, Lock from ..yutils import get_new_path from .base_store import BaseYStore From 999701d26fab4253613e798b8a440ae08d85bbce Mon Sep 17 00:00:00 2001 From: Carlos Herrero <26092748+hbcarlos@users.noreply.github.com> Date: Tue, 10 Oct 2023 14:55:35 +0200 Subject: [PATCH 17/17] Fix windows --- ypy_websocket/stores/file_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ypy_websocket/stores/file_store.py b/ypy_websocket/stores/file_store.py index 89452a0..165a9f2 100644 --- a/ypy_websocket/stores/file_store.py +++ b/ypy_websocket/stores/file_store.py @@ -102,7 +102,7 @@ async def list(self) -> AsyncIterator[str]: # type: ignore[override] await self._initialized.wait() async for child in anyio.Path(self._store_path).glob("**/*.y"): - yield str(child.relative_to(self._store_path).with_suffix("")) + yield child.relative_to(self._store_path).with_suffix("").as_posix() async def get(self, path: str, updates: bool = False) -> dict | None: """