Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
blink1073 committed Jan 6, 2024
2 parents a8144ab + 2bca5d2 commit 8aa1d10
Show file tree
Hide file tree
Showing 19 changed files with 492 additions and 255 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ jobs:
- uses: jupyterlab/maintainer-tools/.github/actions/downstream-test@v1
with:
package_name: jupyter_server
test_command: pytest -vv -ras -W default --durations 10 --color=yes -x

jupyter_kernel_test:
runs-on: ubuntu-latest
Expand All @@ -73,7 +74,7 @@ jobs:
uses: jupyterlab/maintainer-tools/.github/actions/base-setup@v1

- name: Setup conda ${{ matrix.python-version }}
uses: conda-incubator/setup-miniconda@v2
uses: conda-incubator/setup-miniconda@v3
with:
auto-update-conda: true
activate-environment: jupyter_kernel_test
Expand All @@ -97,7 +98,7 @@ jobs:
uses: actions/checkout@v4

- name: Setup Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: "3.9"
architecture: "x64"
Expand Down
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/python-jsonschema/check-jsonschema
rev: 0.27.1
rev: 0.27.2
hooks:
- id: check-github-workflows

Expand All @@ -37,7 +37,7 @@ repos:
types_or: [yaml, html, json]

- repo: https://github.com/pre-commit/mirrors-mypy
rev: "v1.6.1"
rev: "v1.7.1"
hooks:
- id: mypy
files: jupyter_client
Expand Down Expand Up @@ -66,7 +66,7 @@ repos:
- id: rst-inline-touching-normal

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.5
rev: v0.1.6
hooks:
- id: ruff
types_or: [python, jupyter]
Expand All @@ -75,7 +75,7 @@ repos:
types_or: [python, jupyter]

- repo: https://github.com/scientific-python/cookie
rev: "2023.10.27"
rev: "2023.11.17"
hooks:
- id: sp-repo-review
additional_dependencies: ["repo-review[cli]"]
9 changes: 9 additions & 0 deletions docs/messaging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,15 @@ multiple cases:
IPythonQt client) to force a kernel restart to get a clean kernel without
losing client-side state like history or inlined figures.

Implementation recommendation for starting kernels: A restart should optimally
preserve as many resources outside the kernel as possible (e.g. only restart the
kernel and its subprocesses and not any parent processes). That is, ideally a
restart should be "in-place". For local kernels, there is typically no parent
process so a "hard" restart and an in-place restart are identical whereas for
remote kernels this is not generally the same. As an example, if a remote kernel
is run in a container, during an in-place restart the container may be kept
running and a new kernel process within it would be started.

The client sends a shutdown request to the kernel, and once it receives the
reply message (which is otherwise empty), it can assume that the kernel has
completed shutdown safely. The request is sent on the ``control`` channel.
Expand Down
158 changes: 155 additions & 3 deletions jupyter_client/ioloop/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,158 @@
# Distributed under the terms of the Modified BSD License.
import asyncio
import typing as t
from queue import Queue

import zmq
import zmq.asyncio
from jupyter_core.utils import ensure_event_loop
from traitlets import Instance, Type

from ..manager import AsyncKernelManager, KernelManager
from .restarter import AsyncIOLoopKernelRestarter, IOLoopKernelRestarter


class ZMQStream:
"""A class that transforms a ZMQ Socket into a synchronous stream.
The class supports callbacks for message send and receive, and does
the actual I/O on the asyncio loop.
It has a similar interface and function as zmq's ZMQStream, but
does not rely on the tornado event loop.
"""

socket: t.Optional[zmq.sugar.socket.Socket]

def __init__(self, socket: zmq.sugar.socket.Socket):
self.socket = socket
self.__on_recv: t.Optional[t.Callable] = None
self.__on_send: t.Optional[t.Callable] = None
self.__recv_copy = False
self.__send_queue: Queue[t.Any] = Queue()
self.__polling = False

def on_send(self, callback: t.Callable) -> None:
"""Register a callback to be run every time you call send."""
self.__on_send = callback

def on_recv(self, callback: t.Callable, copy: bool = True) -> None:
"""Register a callback to be run every time the socket has something to receive."""
self.__on_recv = callback
self.__recv_copy = copy
self.__start_polling()

def stop_on_recv(self) -> None:
"""Turn off the recv callback."""
self.__on_recv = None

def stop_on_send(self) -> None:
"""Turn off the send callback."""
self.__on_send = None

def send(
self, msg: t.Any, flags: int = 0, copy: bool = True, track: bool = False, **kwargs: t.Any
) -> None:
"""Send a message, optionally also register a new callback for sends.
See zmq.socket.send for details.
"""
kwargs.update(flags=flags, copy=copy, track=track)
self.__send_queue.put((msg, kwargs))
self.__start_polling()

def __recv(self) -> t.Any:
"""Receive data on the channel."""
assert self.socket is not None
msg_list = self.socket.recv_multipart(zmq.NOBLOCK, copy=self.__recv_copy)
if self.__on_recv:
self.__on_recv(msg_list)
return msg_list

def flush(self) -> None:
"""This is a no-op, for backwards compatibility."""

def close(self, linger: t.Optional[int] = None) -> None:
"""Close the channel."""
socket = self.socket
if socket is None:
return
try:
socket.close(linger=linger)
finally:
self.socket = None

def closed(self) -> bool:
"""Check if the channel is closed."""
if self.socket is None:
return True
if self.socket.closed:
# underlying socket has been closed, but not by us!
# trigger our cleanup
self.close()
return True
return False

def __poll(self) -> None:
if self.socket is None:
self.__polling = False
return
mask = zmq.POLLIN
if not self.__send_queue.empty():
mask |= zmq.POLLOUT
poll_result = self.socket.poll(0.1, mask)
if poll_result == zmq.POLLIN:
self.__recv()
elif poll_result == zmq.POLLOUT:
self.__handle_send()
if self.__polling:
loop = ensure_event_loop()
loop.call_soon_threadsafe(self.__poll)

def __handle_send(self) -> None:
msg, kwargs = self.__send_queue.get_nowait()
assert self.socket is not None
self.socket.send_multipart([msg], **kwargs)
if self.__on_send:
self.__on_send()

def __start_polling(self) -> None:
if self.socket and not self.__polling:
loop = ensure_event_loop()
self.__polling = True
loop.call_soon_threadsafe(self.__poll)

def __getattr__(self, attr: str) -> t.Any:
"""Pass through to the underlying socket for other methods."""
if attr.startswith("__"):
return super().__getattr__(attr) # type:ignore[misc]
if self.socket is not None:
return getattr(self.socket, attr)


def as_zmqstream(f: t.Any) -> t.Callable[..., ZMQStream]:
"""Convert a socket to a zmq stream."""

def wrapped(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
save_socket_class = None
# zmqstreams only support sync sockets
if self.context._socket_class is not zmq.Socket:
save_socket_class = self.context._socket_class
self.context._socket_class = zmq.Socket
try:
socket = f(self, *args, **kwargs)
finally:
if save_socket_class:
# restore default socket class
self.context._socket_class = save_socket_class
return ZMQStream(socket)

return wrapped


class IOLoopKernelManager(KernelManager):
"""An io loop kernel manager."""

loop = Instance(asyncio.AbstractEventLoop)
loop = Instance(asyncio.AbstractEventLoop) # type:ignore[type-abstract]

def _loop_default(self) -> asyncio.AbstractEventLoop:
return ensure_event_loop()
Expand All @@ -36,7 +176,7 @@ def start_restarter(self) -> None:
if self.autorestart and self.has_kernel:
if self._restarter is None:
self._restarter = self.restarter_class(
kernel_manager=self, loop=self.loop, parent=self, log=self.log
kernel_manager=self, parent=self, log=self.log
)
self._restarter.start()

Expand All @@ -45,11 +185,17 @@ def stop_restarter(self) -> None:
if self.autorestart and self._restarter is not None:
self._restarter.stop()

connect_shell = as_zmqstream(KernelManager.connect_shell) # type:ignore[assignment]
connect_control = as_zmqstream(KernelManager.connect_control) # type:ignore[assignment]
connect_iopub = as_zmqstream(KernelManager.connect_iopub) # type:ignore[assignment]
connect_stdin = as_zmqstream(KernelManager.connect_stdin) # type:ignore[assignment]
connect_hb = as_zmqstream(KernelManager.connect_hb) # type:ignore[assignment]


class AsyncIOLoopKernelManager(AsyncKernelManager):
"""An async ioloop kernel manager."""

loop = Instance(asyncio.AbstractEventLoop)
loop = Instance(asyncio.AbstractEventLoop) # type:ignore[type-abstract]

def _loop_default(self) -> asyncio.AbstractEventLoop:
return ensure_event_loop()
Expand Down Expand Up @@ -81,3 +227,9 @@ def stop_restarter(self) -> None:
"""Stop the restarter."""
if self.autorestart and self._restarter is not None:
self._restarter.stop()

connect_shell = as_zmqstream(AsyncKernelManager.connect_shell) # type:ignore[assignment]
connect_control = as_zmqstream(AsyncKernelManager.connect_control) # type:ignore[assignment]
connect_iopub = as_zmqstream(AsyncKernelManager.connect_iopub) # type:ignore[assignment]
connect_stdin = as_zmqstream(AsyncKernelManager.connect_stdin) # type:ignore[assignment]
connect_hb = as_zmqstream(AsyncKernelManager.connect_hb) # type:ignore[assignment]
24 changes: 15 additions & 9 deletions jupyter_client/ioloop/restarter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,39 @@
"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
from __future__ import annotations

import asyncio
import time

from jupyter_core.utils import ensure_async

from ..restarter import KernelRestarter


class IOLoopKernelRestarter(KernelRestarter):
"""Monitor and autorestart a kernel."""

_poll_task: asyncio.Task | None = None
_running = False

def start(self) -> None:
"""Start the polling of the kernel."""
if not self._running:
if not self._poll_task:
assert self.parent is not None
assert isinstance(self.parent.loop, asyncio.AbstractEventLoop)
self._poll_task = self.parent.loop.create_task(self._poll_loop())
self._running = True
self.parent.loop.call_soon_threadsafe(self._poll)

async def _poll(self):
while 1:
if not self._running:
return
self.poll()
await asyncio.sleep(1000 * self.time_to_dead)
async def _poll_loop(self) -> None:
while self._running:
await ensure_async(self.poll()) # type:ignore[func-returns-value]
await asyncio.sleep(0.01)

def stop(self) -> None:
"""Stop the kernel polling."""
if self._running:
if self._poll_task is not None:
self._poll_task = None
self._running = False


Expand Down
Loading

0 comments on commit 8aa1d10

Please sign in to comment.