diff --git a/.github/workflows/downstream.yml b/.github/workflows/downstream.yml index 53fd620c6..3f765554c 100644 --- a/.github/workflows/downstream.yml +++ b/.github/workflows/downstream.yml @@ -12,7 +12,6 @@ jobs: strategy: matrix: python-version: ["3.9"] - steps: - name: Checkout uses: actions/checkout@v2 diff --git a/docs/index.rst b/docs/index.rst index f550e23bf..4731a328d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -24,6 +24,7 @@ with Jupyter kernels. kernels wrapperkernels provisioning + pending-kernels .. toctree:: :maxdepth: 2 diff --git a/docs/pending-kernels.rst b/docs/pending-kernels.rst new file mode 100644 index 000000000..4699feb0a --- /dev/null +++ b/docs/pending-kernels.rst @@ -0,0 +1,36 @@ +Pending Kernels +=============== + +*Added in 7.1.0* + +In scenarios where an kernel takes a long time to start (e.g. kernels running remotely), it can be advantageous to immediately return the kernel's model and ID from key methods like ``.start_kernel()`` and ``.shutdown_kernel()``. The kernel will continue its task without blocking other managerial actions. + +This intermediate state is called a **"pending kernel"**. + +How they work +------------- + +When ``.start_kernel()`` or ``.shutdown_kernel()`` is called, a ``Future`` is created under the ``KernelManager.ready`` property. This property can be awaited anytime to ensure that the kernel moves out of its pending state, e.g.: + +.. code-block:: python + + # await a Kernel Manager's `.ready` property to + # block further action until the kernel is out + # of its pending state. + await kernel_manager.ready + +Once the kernel is finished pending, ``.ready.done()`` will be ``True`` and either 1) ``.ready.result()`` will return ``None`` or 2) ``.ready.exception()`` will return a raised exception + +Using pending kernels +--------------------- + +The most common way to interact with pending kernels is through the ``MultiKernelManager``—the object that manages a collection of kernels—by setting its ``use_pending_kernels`` trait to ``True``. Pending kernels are "opt-in"; they are not used by default in the ``MultiKernelManager``. + +When ``use_pending_kernels`` is ``True``, the following changes are made to the ``MultiKernelManager``: + +1. ``start_kernel`` and ``stop_kernel`` return immediately while running the pending task in a background thread. +2. The following methods raise a ``RuntimeError`` if a kernel is pending: + * ``restart_kernel`` + * ``interrupt_kernel`` + * ``shutdown_kernel`` +3. ``shutdown_all`` will wait for all pending kernels to become ready before attempting to shut them down. diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index 4908177b5..bc3190f25 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -2,6 +2,7 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. import asyncio +import functools import os import re import signal @@ -51,6 +52,35 @@ class _ShutdownStatus(Enum): SigkillRequest = "SigkillRequest" +def in_pending_state(method): + """Sets the kernel to a pending state by + creating a fresh Future for the KernelManager's `ready` + attribute. Once the method is finished, set the Future's results. + """ + + @functools.wraps(method) + async def wrapper(self, *args, **kwargs): + # Create a future for the decorated method + try: + self._ready = Future() + except RuntimeError: + # No event loop running, use concurrent future + self._ready = CFuture() + try: + # call wrapped method, await, and set the result or exception. + out = await method(self, *args, **kwargs) + # Add a small sleep to ensure tests can capture the state before done + await asyncio.sleep(0.01) + self._ready.set_result(None) + return out + except Exception as e: + self._ready.set_exception(e) + self.log.exception(self._ready.exception()) + raise e + + return wrapper + + class KernelManager(ConnectionFileMixin): """Manages a single kernel in a subprocess on this host. @@ -60,6 +90,7 @@ class KernelManager(ConnectionFileMixin): def __init__(self, *args, **kwargs): super().__init__(**kwargs) self._shutdown_status = _ShutdownStatus.Unset + # Create a place holder future. try: self._ready = Future() except RuntimeError: @@ -329,6 +360,7 @@ async def _async_post_start_kernel(self, **kw) -> None: post_start_kernel = run_sync(_async_post_start_kernel) + @in_pending_state async def _async_start_kernel(self, **kw): """Starts a kernel on this host in a separate process. @@ -341,25 +373,12 @@ async def _async_start_kernel(self, **kw): keyword arguments that are passed down to build the kernel_cmd and launching the kernel (e.g. Popen kwargs). """ - done = self._ready.done() - - try: - kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw)) - - # launch the kernel subprocess - self.log.debug("Starting kernel: %s", kernel_cmd) - await ensure_async(self._launch_kernel(kernel_cmd, **kw)) - await ensure_async(self.post_start_kernel(**kw)) - if not done: - # Add a small sleep to ensure tests can capture the state before done - await asyncio.sleep(0.01) - self._ready.set_result(None) + kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw)) - except Exception as e: - if not done: - self._ready.set_exception(e) - self.log.exception(self._ready.exception()) - raise e + # launch the kernel subprocess + self.log.debug("Starting kernel: %s", kernel_cmd) + await ensure_async(self._launch_kernel(kernel_cmd, **kw)) + await ensure_async(self.post_start_kernel(**kw)) start_kernel = run_sync(_async_start_kernel) @@ -434,6 +453,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None: cleanup_resources = run_sync(_async_cleanup_resources) + @in_pending_state async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False): """Attempts to stop the kernel process cleanly. @@ -452,10 +472,6 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) Will this kernel be restarted after it is shutdown. When this is True, connection files will not be cleaned up. """ - # Shutdown is a no-op for a kernel that had a failed startup - if self._ready.exception(): - return - self.shutting_down = True # Used by restarter to prevent race condition # Stop monitoring for restarting while we shutdown. self.stop_restarter() @@ -503,9 +519,6 @@ async def _async_restart_kernel(self, now: bool = False, newports: bool = False, if self._launch_args is None: raise RuntimeError("Cannot restart the kernel. " "No previous call to 'start_kernel'.") - if not self._ready.done(): - raise RuntimeError("Cannot restart the kernel. " "Kernel has not fully started.") - # Stop currently running kernel. await ensure_async(self.shutdown_kernel(now=now, restart=True)) diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index bb07a4008..defce51a2 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -97,7 +97,12 @@ def create_kernel_manager(*args, **kwargs) -> KernelManager: context = Instance("zmq.Context") - _starting_kernels = Dict() + _pending_kernels = Dict() + + @property + def _starting_kernels(self): + """A shim for backwards compatibility.""" + return self._pending_kernels @default("context") def _context_default(self) -> zmq.Context: @@ -165,7 +170,22 @@ async def _add_kernel_when_ready( await kernel_awaitable self._kernels[kernel_id] = km finally: - self._starting_kernels.pop(kernel_id, None) + self._pending_kernels.pop(kernel_id, None) + + async def _remove_kernel_when_ready( + self, kernel_id: str, kernel_awaitable: t.Awaitable + ) -> None: + try: + await kernel_awaitable + self.remove_kernel(kernel_id) + finally: + self._pending_kernels.pop(kernel_id, None) + + def _using_pending_kernels(self): + """Returns a boolean; a clearer method for determining if + this multikernelmanager is using pending kernels or not + """ + return getattr(self, 'use_pending_kernels', False) async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwargs) -> str: """Start a new kernel. @@ -186,17 +206,38 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg starter = ensure_async(km.start_kernel(**kwargs)) fut = asyncio.ensure_future(self._add_kernel_when_ready(kernel_id, km, starter)) - self._starting_kernels[kernel_id] = fut - - if getattr(self, 'use_pending_kernels', False): + self._pending_kernels[kernel_id] = fut + # Handling a Pending Kernel + if self._using_pending_kernels(): + # If using pending kernels, do not block + # on the kernel start. self._kernels[kernel_id] = km else: await fut + # raise an exception if one occurred during kernel startup. + if km.ready.exception(): + raise km.ready.exception() # type: ignore return kernel_id start_kernel = run_sync(_async_start_kernel) + async def _shutdown_kernel_when_ready( + self, + kernel_id: str, + now: t.Optional[bool] = False, + restart: t.Optional[bool] = False, + ) -> None: + """Wait for a pending kernel to be ready + before shutting the kernel down. + """ + # Only do this if using pending kernels + if self._using_pending_kernels(): + kernel = self._kernels[kernel_id] + await kernel.ready + # Once out of a pending state, we can call shutdown. + await ensure_async(self.shutdown_kernel(kernel_id, now=now, restart=restart)) + async def _async_shutdown_kernel( self, kernel_id: str, @@ -215,15 +256,31 @@ async def _async_shutdown_kernel( Will the kernel be restarted? """ self.log.info("Kernel shutdown: %s" % kernel_id) - if kernel_id in self._starting_kernels: + # If we're using pending kernels, block shutdown when a kernel is pending. + if self._using_pending_kernels() and kernel_id in self._pending_kernels: + raise RuntimeError("Kernel is in a pending state. Cannot shutdown.") + # If the kernel is still starting, wait for it to be ready. + elif kernel_id in self._starting_kernels: + kernel = self._starting_kernels[kernel_id] try: - await self._starting_kernels[kernel_id] + await kernel except Exception: self.remove_kernel(kernel_id) return km = self.get_kernel(kernel_id) - await ensure_async(km.shutdown_kernel(now, restart)) - self.remove_kernel(kernel_id) + # If a pending kernel raised an exception, remove it. + if km.ready.exception(): + self.remove_kernel(kernel_id) + return + stopper = ensure_async(km.shutdown_kernel(now, restart)) + fut = asyncio.ensure_future(self._remove_kernel_when_ready(kernel_id, stopper)) + self._pending_kernels[kernel_id] = fut + # Await the kernel if not using pending kernels. + if not self._using_pending_kernels(): + await fut + # raise an exception if one occurred during kernel shutdown. + if km.ready.exception(): + raise km.ready.exception() # type: ignore shutdown_kernel = run_sync(_async_shutdown_kernel) @@ -258,13 +315,17 @@ def remove_kernel(self, kernel_id: str) -> KernelManager: async def _async_shutdown_all(self, now: bool = False) -> None: """Shutdown all kernels.""" kids = self.list_kernel_ids() - kids += list(self._starting_kernels) - futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in set(kids)] + kids += list(self._pending_kernels) + futs = [ensure_async(self._shutdown_kernel_when_ready(kid, now=now)) for kid in set(kids)] await asyncio.gather(*futs) + # When using "shutdown all", all pending kernels + # should be awaited before exiting this method. + if self._using_pending_kernels(): + for km in self._kernels.values(): + await km.ready shutdown_all = run_sync(_async_shutdown_all) - @kernel_method def interrupt_kernel(self, kernel_id: str) -> None: """Interrupt (SIGINT) the kernel by its uuid. @@ -273,7 +334,12 @@ def interrupt_kernel(self, kernel_id: str) -> None: kernel_id : uuid The id of the kernel to interrupt. """ + kernel = self.get_kernel(kernel_id) + if not kernel.ready.done(): + raise RuntimeError("Kernel is in a pending state. Cannot interrupt.") + out = kernel.interrupt_kernel() self.log.info("Kernel interrupted: %s" % kernel_id) + return out @kernel_method def signal_kernel(self, kernel_id: str, signum: int) -> None: @@ -291,8 +357,7 @@ def signal_kernel(self, kernel_id: str, signum: int) -> None: """ self.log.info("Signaled Kernel %s with %s" % (kernel_id, signum)) - @kernel_method - def restart_kernel(self, kernel_id: str, now: bool = False) -> None: + async def _async_restart_kernel(self, kernel_id: str, now: bool = False) -> None: """Restart a kernel by its uuid, keeping the same ports. Parameters @@ -307,7 +372,15 @@ def restart_kernel(self, kernel_id: str, now: bool = False) -> None: In all cases the kernel is restarted, the only difference is whether it is given a chance to perform a clean shutdown or not. """ + kernel = self.get_kernel(kernel_id) + if self._using_pending_kernels(): + if not kernel.ready.done(): + raise RuntimeError("Kernel is in a pending state. Cannot restart.") + out = await ensure_async(kernel.restart_kernel(now=now)) self.log.info("Kernel restarted: %s" % kernel_id) + return out + + restart_kernel = run_sync(_async_restart_kernel) @kernel_method def is_alive(self, kernel_id: str) -> bool: @@ -475,5 +548,6 @@ class AsyncMultiKernelManager(MultiKernelManager): ).tag(config=True) start_kernel = MultiKernelManager._async_start_kernel + restart_kernel = MultiKernelManager._async_restart_kernel shutdown_kernel = MultiKernelManager._async_shutdown_kernel shutdown_all = MultiKernelManager._async_shutdown_all diff --git a/jupyter_client/tests/test_multikernelmanager.py b/jupyter_client/tests/test_multikernelmanager.py index d9bf7956a..8cd953d60 100644 --- a/jupyter_client/tests/test_multikernelmanager.py +++ b/jupyter_client/tests/test_multikernelmanager.py @@ -4,6 +4,7 @@ import os import sys import uuid +from asyncio import ensure_future from subprocess import PIPE from unittest import TestCase @@ -29,6 +30,14 @@ TIMEOUT = 30 +async def now(awaitable): + """Use this function ensure that this awaitable + happens before other awaitables defined after it. + """ + (out,) = await asyncio.gather(awaitable) + return out + + class TestKernelManager(TestCase): def setUp(self): self.env_patch = test_env() @@ -346,7 +355,7 @@ async def test_shutdown_all_while_starting(self): self.assertNotIn(kid, km) # Start another kernel - kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + kid = await ensure_future(km.start_kernel(stdout=PIPE, stderr=PIPE)) self.assertIn(kid, km) self.assertEqual(len(km), 1) await km.shutdown_all() @@ -357,52 +366,69 @@ async def test_shutdown_all_while_starting(self): @gen_test async def test_use_pending_kernels(self): km = self._get_pending_kernels_km() - kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + kid = await ensure_future(km.start_kernel(stdout=PIPE, stderr=PIPE)) kernel = km.get_kernel(kid) assert not kernel.ready.done() assert kid in km assert kid in km.list_kernel_ids() assert len(km) == 1, f"{len(km)} != {1}" + # Wait for the kernel to start. await kernel.ready await km.restart_kernel(kid, now=True) - assert await km.is_alive(kid) + out = await km.is_alive(kid) + assert out assert kid in km.list_kernel_ids() await km.interrupt_kernel(kid) k = km.get_kernel(kid) assert isinstance(k, AsyncKernelManager) - await km.shutdown_kernel(kid, now=True) + await ensure_future(km.shutdown_kernel(kid, now=True)) + # Wait for the kernel to shutdown + await kernel.ready assert kid not in km, f"{kid} not in {km}" @gen_test async def test_use_pending_kernels_early_restart(self): km = self._get_pending_kernels_km() - kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + kid = await ensure_future(km.start_kernel(stdout=PIPE, stderr=PIPE)) kernel = km.get_kernel(kid) assert not kernel.ready.done() with pytest.raises(RuntimeError): await km.restart_kernel(kid, now=True) await kernel.ready - await km.shutdown_kernel(kid, now=True) + await ensure_future(km.shutdown_kernel(kid, now=True)) + # Wait for the kernel to shutdown + await kernel.ready assert kid not in km, f"{kid} not in {km}" @gen_test async def test_use_pending_kernels_early_shutdown(self): km = self._get_pending_kernels_km() - kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + kid = await ensure_future(km.start_kernel(stdout=PIPE, stderr=PIPE)) kernel = km.get_kernel(kid) assert not kernel.ready.done() - await km.shutdown_kernel(kid, now=True) + # Try shutting down while the kernel is pending + with pytest.raises(RuntimeError): + await ensure_future(km.shutdown_kernel(kid, now=True)) + await kernel.ready + # Shutdown once the kernel is ready + await ensure_future(km.shutdown_kernel(kid, now=True)) + # Wait for the kernel to shutdown + await kernel.ready assert kid not in km, f"{kid} not in {km}" @gen_test async def test_use_pending_kernels_early_interrupt(self): km = self._get_pending_kernels_km() - kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + kid = await ensure_future(km.start_kernel(stdout=PIPE, stderr=PIPE)) kernel = km.get_kernel(kid) assert not kernel.ready.done() with pytest.raises(RuntimeError): await km.interrupt_kernel(kid) - await km.shutdown_kernel(kid, now=True) + # Now wait for the kernel to be ready. + await kernel.ready + await ensure_future(km.shutdown_kernel(kid, now=True)) + # Wait for the kernel to shutdown + await kernel.ready assert kid not in km, f"{kid} not in {km}" @gen_test @@ -547,7 +573,7 @@ async def test_bad_kernelspec(self): name="bad", ) with pytest.raises(FileNotFoundError): - await km.start_kernel(kernel_name="bad", stdout=PIPE, stderr=PIPE) + await ensure_future(km.start_kernel(kernel_name="bad", stdout=PIPE, stderr=PIPE)) @gen_test async def test_bad_kernelspec_pending(self): @@ -557,10 +583,11 @@ async def test_bad_kernelspec_pending(self): argv=["non_existent_executable"], name="bad", ) - kernel_id = await km.start_kernel(kernel_name="bad", stdout=PIPE, stderr=PIPE) - assert kernel_id in km._starting_kernels + kernel_id = await ensure_future( + km.start_kernel(kernel_name="bad", stdout=PIPE, stderr=PIPE) + ) with pytest.raises(FileNotFoundError): await km.get_kernel(kernel_id).ready assert kernel_id in km.list_kernel_ids() - await km.shutdown_kernel(kernel_id) + await ensure_future(km.shutdown_kernel(kernel_id)) assert kernel_id not in km.list_kernel_ids()