diff --git a/.flake8 b/.flake8 index 243a1074..a850a7c3 100644 --- a/.flake8 +++ b/.flake8 @@ -10,7 +10,7 @@ enable-extensions = G extend-ignore = G001, G002, G004, G200, G201, G202, # black adds spaces around ':' - E203, + E203,E231 per-file-ignores = # B011: Do not call assert False since python -O removes these calls # F841 local variable 'foo' is assigned to but never used diff --git a/jupyter_collaboration/handlers.py b/jupyter_collaboration/handlers.py index 80b6546c..acbe9043 100644 --- a/jupyter_collaboration/handlers.py +++ b/jupyter_collaboration/handlers.py @@ -137,7 +137,7 @@ async def open(self, room_id): self.close(1004, f"File {path} not found.") else: self.log.error(f"Error initializing: {path}\n{e!r}", exc_info=e) - self.close(1003, f"Error initializing: {path}. You need to close the document.") + self.close(1005, f"Error initializing: {path}. You need to close the document.") # Clean up the room and delete the file loader if self.room is not None and len(self.room.clients) == 0 or self.room.clients == [self]: diff --git a/jupyter_collaboration/rooms/base.py b/jupyter_collaboration/rooms/base.py index 9e8a3c4c..3b7bd3ba 100644 --- a/jupyter_collaboration/rooms/base.py +++ b/jupyter_collaboration/rooms/base.py @@ -44,7 +44,7 @@ async def handle_msg(self, data: bytes) -> None: def broadcast_msg(self, msg: bytes) -> None: for client in self.clients: - self._task_group.start_soon(client.send, msg) + self._task_group.start_soon(client.send, msg) # type: ignore[union-attr] async def _broadcast_updates(self): # FIXME should be upstreamed diff --git a/jupyter_collaboration/rooms/document.py b/jupyter_collaboration/rooms/document.py index 3798d3e8..4330ce86 100644 --- a/jupyter_collaboration/rooms/document.py +++ b/jupyter_collaboration/rooms/document.py @@ -77,7 +77,7 @@ async def initialize(self) -> None: this setter will subscribe for updates on the shared document. """ async with self._initialization_lock: - if self.ready: # type: ignore[has-type] + if self.ready: return self.log.info("Initializing room %s", self._room_id) @@ -88,7 +88,9 @@ async def initialize(self) -> None: if self.ystore is not None and await self.ystore.exists(self._room_id): # Load the content from the store doc = await self.ystore.get(self._room_id) + assert doc self._session_id = doc["session_id"] + await self.ystore.apply_updates(self._room_id, self.ydoc) self._emit( LogLevel.INFO, @@ -207,18 +209,7 @@ async def _on_content_change(self, event: str, args: dict[str, Any]) -> None: if event == "metadata" and ( self._last_modified is None or self._last_modified < args["last_modified"] ): - self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id) - self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room.") - - msg_id = str(uuid.uuid4()) - self._messages[msg_id] = asyncio.Lock() - await self._outofband_lock.acquire() - data = msg_id.encode() - self.broadcast_msg( - bytes([MessageType.ROOM, RoomMessages.FILE_CHANGED]) - + write_var_uint(len(data)) - + data - ) + await self._send_confict_msg() def _on_document_change(self, target: str, event: Any) -> None: """ @@ -247,34 +238,35 @@ def _on_document_change(self, target: str, event: Any) -> None: async def _load_document(self) -> None: try: - model = await self._file.load_content(self._file_format, self._file_type, True) + async with self._update_lock: + model = await self._file.load_content(self._file_format, self._file_type, True) + self._document.source = model["content"] + self._last_modified = model["last_modified"] + self._document.dirty = False + except Exception as e: msg = f"Error loading content from file: {self._file.path}\n{e!r}" self.log.error(msg, exc_info=e) self._emit(LogLevel.ERROR, None, msg) return None - async with self._update_lock: - self._document.source = model["content"] - self._last_modified = model["last_modified"] - self._document.dirty = False - async def _save_document(self) -> None: """ Saves the content of the document to disk. """ try: self.log.info("Saving the content from room %s", self._room_id) - model = await self._file.save_content( - { - "format": self._file_format, - "type": self._file_type, - "last_modified": self._last_modified, - "content": self._document.source, - } - ) - self._last_modified = model["last_modified"] + async with self._update_lock: + model = await self._file.save_content( + { + "format": self._file_format, + "type": self._file_type, + "last_modified": self._last_modified, + "content": self._document.source, + } + ) + self._last_modified = model["last_modified"] self._document.dirty = False self._emit(LogLevel.INFO, "save", "Content saved.") @@ -299,40 +291,41 @@ async def _maybe_save_document(self) -> None: # save after X seconds of inactivity await asyncio.sleep(self._save_delay) + if self._outofband_lock.locked(): + return + try: self.log.info("Saving the content from room %s", self._room_id) - model = await self._file.maybe_save_content( - { - "format": self._file_format, - "type": self._file_type, - "last_modified": self._last_modified, - "content": self._document.source, - } - ) - self._last_modified = model["last_modified"] async with self._update_lock: + model = await self._file.maybe_save_content( + { + "format": self._file_format, + "type": self._file_type, + "last_modified": self._last_modified, + "content": self._document.source, + } + ) + self._last_modified = model["last_modified"] self._document.dirty = False self._emit(LogLevel.INFO, "save", "Content saved.") except OutOfBandChanges: - self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id) - try: - model = await self._file.load_content(self._file_format, self._file_type, True) - except Exception as e: - msg = f"Error loading content from file: {self._file.path}\n{e!r}" - self.log.error(msg, exc_info=e) - self._emit(LogLevel.ERROR, None, msg) - return None - - async with self._update_lock: - self._document.source = model["content"] - self._last_modified = model["last_modified"] - self._document.dirty = False - - self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes while saving.") + await self._send_confict_msg() except Exception as e: msg = f"Error saving file: {self._file.path}\n{e!r}" self.log.error(msg, exc_info=e) self._emit(LogLevel.ERROR, None, msg) + + async def _send_confict_msg(self) -> None: + self.log.info("Out-of-band changes in room %s", self._room_id) + self._emit(LogLevel.INFO, "overwrite", f"Out-of-band changes in room {self._room_id}") + + msg_id = str(uuid.uuid4()) + self._messages[msg_id] = asyncio.Lock() + await self._outofband_lock.acquire() + data = msg_id.encode() + self.broadcast_msg( + bytes([MessageType.ROOM, RoomMessages.FILE_CHANGED]) + write_var_uint(len(data)) + data + ) diff --git a/jupyter_collaboration/rooms/yroom.py b/jupyter_collaboration/rooms/yroom.py index ca140b2c..01e3ce70 100644 --- a/jupyter_collaboration/rooms/yroom.py +++ b/jupyter_collaboration/rooms/yroom.py @@ -115,7 +115,7 @@ def on_message(self) -> Callable[[bytes], Awaitable[bool] | bool] | None: return self._on_message @on_message.setter - def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None): + def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None) -> None: """ Arguments: value: An optional callback to call when a message is received. If the callback returns True, the message is skipped. @@ -125,17 +125,17 @@ def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None): async def _broadcast_updates(self): async with self._update_receive_stream: async for update in self._update_receive_stream: - if self._task_group.cancel_scope.cancel_called: + if self._task_group.cancel_scope.cancel_called: # type: ignore[union-attr] return # broadcast internal ydoc's update to all clients, that includes changes from the # clients and changes from the backend (out-of-band changes) for client in self.clients: self.log.debug("Sending Y update to client with endpoint: %s", client.path) message = create_update_message(update) - self._task_group.start_soon(client.send, message) + self._task_group.start_soon(client.send, message) # type: ignore[union-attr] if self.ystore: self.log.debug("Writing Y update to YStore") - self._task_group.start_soon(self.ystore.write, client.path, update) + self._task_group.start_soon(self.ystore.write, client.path, update) # type: ignore[union-attr] async def __aenter__(self) -> YRoom: if self._task_group is not None: @@ -158,7 +158,7 @@ async def __aexit__(self, exc_type, exc_value, exc_tb): self._task_group = None return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb) - async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED): + async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> None: """Start the room. Arguments: @@ -178,7 +178,7 @@ async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED): self._starting = False task_status.started() - def stop(self): + def stop(self) -> None: """Stop the room.""" if self._task_group is None: raise RuntimeError("YRoom not running") @@ -186,7 +186,7 @@ def stop(self): self._task_group.cancel_scope.cancel() self._task_group = None - async def serve(self, websocket: Websocket): + async def serve(self, websocket: Websocket) -> None: """Serve a client. Arguments: diff --git a/packages/docprovider/src/yprovider.ts b/packages/docprovider/src/yprovider.ts index 2035fc7f..0f6887f2 100644 --- a/packages/docprovider/src/yprovider.ts +++ b/packages/docprovider/src/yprovider.ts @@ -136,10 +136,10 @@ export class WebSocketProvider implements IDocumentProvider { } private _onConnectionClosed = (event: any): void => { - if (event.code === 1003) { - console.error('Document provider closed:', event.reason); + if (event.code >= 1003 && event.code < 1006) { + console.error('Document provider closed:', event.code, event.reason); - showErrorMessage(this._trans.__('Document session error'), event.reason, [ + showErrorMessage(this._trans.__('Document error'), event.reason, [ Dialog.okButton() ]);