Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improves autosaving documents #206

Merged
merged 2 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ enable-extensions = G
extend-ignore =
G001, G002, G004, G200, G201, G202,
# black adds spaces around ':'
E203,
E203,E231
per-file-ignores =
# B011: Do not call assert False since python -O removes these calls
# F841 local variable 'foo' is assigned to but never used
Expand Down
2 changes: 1 addition & 1 deletion jupyter_collaboration/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def open(self, room_id):
self.close(1004, f"File {path} not found.")
else:
self.log.error(f"Error initializing: {path}\n{e!r}", exc_info=e)
self.close(1003, f"Error initializing: {path}. You need to close the document.")
self.close(1005, f"Error initializing: {path}. You need to close the document.")

# Clean up the room and delete the file loader
if self.room is not None and len(self.room.clients) == 0 or self.room.clients == [self]:
Expand Down
2 changes: 1 addition & 1 deletion jupyter_collaboration/rooms/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def handle_msg(self, data: bytes) -> None:

def broadcast_msg(self, msg: bytes) -> None:
for client in self.clients:
self._task_group.start_soon(client.send, msg)
self._task_group.start_soon(client.send, msg) # type: ignore[union-attr]

async def _broadcast_updates(self):
# FIXME should be upstreamed
Expand Down
97 changes: 45 additions & 52 deletions jupyter_collaboration/rooms/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def initialize(self) -> None:
this setter will subscribe for updates on the shared document.
"""
async with self._initialization_lock:
if self.ready: # type: ignore[has-type]
if self.ready:
return

self.log.info("Initializing room %s", self._room_id)
Expand All @@ -88,7 +88,9 @@ async def initialize(self) -> None:
if self.ystore is not None and await self.ystore.exists(self._room_id):
# Load the content from the store
doc = await self.ystore.get(self._room_id)
assert doc
self._session_id = doc["session_id"]

await self.ystore.apply_updates(self._room_id, self.ydoc)
self._emit(
LogLevel.INFO,
Expand Down Expand Up @@ -207,18 +209,7 @@ async def _on_content_change(self, event: str, args: dict[str, Any]) -> None:
if event == "metadata" and (
self._last_modified is None or self._last_modified < args["last_modified"]
):
self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id)
self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room.")

msg_id = str(uuid.uuid4())
self._messages[msg_id] = asyncio.Lock()
await self._outofband_lock.acquire()
data = msg_id.encode()
self.broadcast_msg(
bytes([MessageType.ROOM, RoomMessages.FILE_CHANGED])
+ write_var_uint(len(data))
+ data
)
await self._send_confict_msg()

def _on_document_change(self, target: str, event: Any) -> None:
"""
Expand Down Expand Up @@ -247,34 +238,35 @@ def _on_document_change(self, target: str, event: Any) -> None:

async def _load_document(self) -> None:
try:
model = await self._file.load_content(self._file_format, self._file_type, True)
async with self._update_lock:
model = await self._file.load_content(self._file_format, self._file_type, True)
self._document.source = model["content"]
self._last_modified = model["last_modified"]
self._document.dirty = False

except Exception as e:
msg = f"Error loading content from file: {self._file.path}\n{e!r}"
self.log.error(msg, exc_info=e)
self._emit(LogLevel.ERROR, None, msg)
return None

async with self._update_lock:
self._document.source = model["content"]
self._last_modified = model["last_modified"]
self._document.dirty = False

async def _save_document(self) -> None:
"""
Saves the content of the document to disk.
"""
try:
self.log.info("Saving the content from room %s", self._room_id)
model = await self._file.save_content(
{
"format": self._file_format,
"type": self._file_type,
"last_modified": self._last_modified,
"content": self._document.source,
}
)
self._last_modified = model["last_modified"]

async with self._update_lock:
model = await self._file.save_content(
{
"format": self._file_format,
"type": self._file_type,
"last_modified": self._last_modified,
"content": self._document.source,
}
)
self._last_modified = model["last_modified"]
self._document.dirty = False

self._emit(LogLevel.INFO, "save", "Content saved.")
Expand All @@ -299,40 +291,41 @@ async def _maybe_save_document(self) -> None:
# save after X seconds of inactivity
await asyncio.sleep(self._save_delay)

if self._outofband_lock.locked():
return

try:
self.log.info("Saving the content from room %s", self._room_id)
model = await self._file.maybe_save_content(
{
"format": self._file_format,
"type": self._file_type,
"last_modified": self._last_modified,
"content": self._document.source,
}
)
self._last_modified = model["last_modified"]
async with self._update_lock:
model = await self._file.maybe_save_content(
{
"format": self._file_format,
"type": self._file_type,
"last_modified": self._last_modified,
"content": self._document.source,
}
)
self._last_modified = model["last_modified"]
self._document.dirty = False

self._emit(LogLevel.INFO, "save", "Content saved.")

except OutOfBandChanges:
self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id)
try:
model = await self._file.load_content(self._file_format, self._file_type, True)
except Exception as e:
msg = f"Error loading content from file: {self._file.path}\n{e!r}"
self.log.error(msg, exc_info=e)
self._emit(LogLevel.ERROR, None, msg)
return None

async with self._update_lock:
self._document.source = model["content"]
self._last_modified = model["last_modified"]
self._document.dirty = False

self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes while saving.")
await self._send_confict_msg()

except Exception as e:
msg = f"Error saving file: {self._file.path}\n{e!r}"
self.log.error(msg, exc_info=e)
self._emit(LogLevel.ERROR, None, msg)

async def _send_confict_msg(self) -> None:
self.log.info("Out-of-band changes in room %s", self._room_id)
self._emit(LogLevel.INFO, "overwrite", f"Out-of-band changes in room {self._room_id}")

msg_id = str(uuid.uuid4())
self._messages[msg_id] = asyncio.Lock()
await self._outofband_lock.acquire()
data = msg_id.encode()
self.broadcast_msg(
bytes([MessageType.ROOM, RoomMessages.FILE_CHANGED]) + write_var_uint(len(data)) + data
)
14 changes: 7 additions & 7 deletions jupyter_collaboration/rooms/yroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def on_message(self) -> Callable[[bytes], Awaitable[bool] | bool] | None:
return self._on_message

@on_message.setter
def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None):
def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None) -> None:
"""
Arguments:
value: An optional callback to call when a message is received. If the callback returns True, the message is skipped.
Expand All @@ -125,17 +125,17 @@ def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None):
async def _broadcast_updates(self):
async with self._update_receive_stream:
async for update in self._update_receive_stream:
if self._task_group.cancel_scope.cancel_called:
if self._task_group.cancel_scope.cancel_called: # type: ignore[union-attr]
return
# broadcast internal ydoc's update to all clients, that includes changes from the
# clients and changes from the backend (out-of-band changes)
for client in self.clients:
self.log.debug("Sending Y update to client with endpoint: %s", client.path)
message = create_update_message(update)
self._task_group.start_soon(client.send, message)
self._task_group.start_soon(client.send, message) # type: ignore[union-attr]
if self.ystore:
self.log.debug("Writing Y update to YStore")
self._task_group.start_soon(self.ystore.write, client.path, update)
self._task_group.start_soon(self.ystore.write, client.path, update) # type: ignore[union-attr]

async def __aenter__(self) -> YRoom:
if self._task_group is not None:
Expand All @@ -158,7 +158,7 @@ async def __aexit__(self, exc_type, exc_value, exc_tb):
self._task_group = None
return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb)

async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> None:
"""Start the room.

Arguments:
Expand All @@ -178,15 +178,15 @@ async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
self._starting = False
task_status.started()

def stop(self):
def stop(self) -> None:
"""Stop the room."""
if self._task_group is None:
raise RuntimeError("YRoom not running")

self._task_group.cancel_scope.cancel()
self._task_group = None

async def serve(self, websocket: Websocket):
async def serve(self, websocket: Websocket) -> None:
"""Serve a client.

Arguments:
Expand Down
6 changes: 3 additions & 3 deletions packages/docprovider/src/yprovider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ export class WebSocketProvider implements IDocumentProvider {
}

private _onConnectionClosed = (event: any): void => {
if (event.code === 1003) {
console.error('Document provider closed:', event.reason);
if (event.code >= 1003 && event.code < 1006) {
console.error('Document provider closed:', event.code, event.reason);

showErrorMessage(this._trans.__('Document session error'), event.reason, [
showErrorMessage(this._trans.__('Document error'), event.reason, [
Dialog.okButton()
]);

Expand Down