diff --git a/pycrdt_websocket/ystore.py b/pycrdt_websocket/ystore.py index f065ddb..4c51f0f 100644 --- a/pycrdt_websocket/ystore.py +++ b/pycrdt_websocket/ystore.py @@ -329,7 +329,6 @@ def __init__( self.metadata_callback = metadata_callback self.log = log or getLogger(__name__) self.lock = Lock() - self.db_initialized = Event() async def start( self, @@ -342,11 +341,10 @@ async def start( Arguments: task_status: The status to set when the task has started. """ - self.db_initialized = Event() if from_context_manager: assert self._task_group is not None - self._task_group.start_soon(self._init_db) + await self._init_db() task_status.started() self.started.set() return @@ -355,13 +353,13 @@ async def start( if self._task_group is not None: raise RuntimeError("YStore already running") async with create_task_group() as self._task_group: - self._task_group.start_soon(self._init_db) + await self._init_db() task_status.started() self.started.set() async def stop(self) -> None: """Stop the store.""" - if self.db_initialized.is_set(): + if hasattr(self, "db_initialized") and self.db_initialized.is_set(): await self._db.close() await super().stop() @@ -415,6 +413,8 @@ async def read(self) -> AsyncIterator[tuple[bytes, bytes, float]]: Returns: A tuple of (update, metadata, timestamp) for each update. """ + if not hasattr(self, "db_initialized"): + raise RuntimeError("ystore is not started") await self.db_initialized.wait() try: async with self.lock: @@ -438,6 +438,8 @@ async def write(self, data: bytes) -> None: Arguments: data: The update to store. """ + if not hasattr(self, "db_initialized"): + raise RuntimeError("ystore is not started") await self.db_initialized.wait() async with self.lock: # first, determine time elapsed since last update