Skip to content

Commit

Permalink
Use non-blocking zmq Poller (#1023)
Browse files Browse the repository at this point in the history
* Use non-blocking zmq Poller

* Pin pytest<8.2.0

* Fix mypy

* Apply review comment

---------

Co-authored-by: Frédéric Collonval <[email protected]>
  • Loading branch information
fcollonval and fcollonval authored May 23, 2024
1 parent 0236973 commit 474093f
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 10 deletions.
2 changes: 1 addition & 1 deletion jupyter_client/asynchronous/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class AsyncKernelClient(KernelClient):
raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds.
"""

context = Instance(zmq.asyncio.Context)
context = Instance(zmq.asyncio.Context) # type:ignore[arg-type]

def _context_default(self) -> zmq.asyncio.Context:
self._created_context = True
Expand Down
10 changes: 4 additions & 6 deletions jupyter_client/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,8 @@ def _recv(self, **kwargs: t.Any) -> t.Dict[str, t.Any]:
def get_msg(self, timeout: t.Optional[float] = None) -> t.Dict[str, t.Any]:
"""Gets a message if there is one that is ready."""
assert self.socket is not None
if timeout is not None:
timeout *= 1000 # seconds to ms
ready = self.socket.poll(timeout)
timeout_ms = None if timeout is None else int(timeout * 1000) # seconds to ms
ready = self.socket.poll(timeout_ms)
if ready:
res = self._recv()
return res
Expand Down Expand Up @@ -305,9 +304,8 @@ async def get_msg( # type:ignore[override]
) -> t.Dict[str, t.Any]:
"""Gets a message if there is one that is ready."""
assert self.socket is not None
if timeout is not None:
timeout *= 1000 # seconds to ms
ready = await self.socket.poll(timeout)
timeout_ms = None if timeout is None else int(timeout * 1000) # seconds to ms
ready = await self.socket.poll(timeout_ms)
if ready:
res = await self._recv()
return res
Expand Down
4 changes: 2 additions & 2 deletions jupyter_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ async def _async_execute_interactive(
else:
timeout_ms = None

poller = zmq.Poller()
poller = zmq.asyncio.Poller()
iopub_socket = self.iopub_channel.socket
poller.register(iopub_socket, zmq.POLLIN)
if allow_stdin:
Expand All @@ -544,7 +544,7 @@ async def _async_execute_interactive(
if timeout is not None:
timeout = max(0, deadline - time.monotonic())
timeout_ms = int(1000 * timeout)
events = dict(poller.poll(timeout_ms))
events = dict(await poller.poll(timeout_ms))
if not events:
emsg = "Timeout waiting for output"
raise TimeoutError(emsg)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ test = [
"mypy",
"paramiko; sys_platform == 'win32'",
"pre-commit",
"pytest",
"pytest<8.2.0",
"pytest-jupyter[client]>=0.4.1",
"pytest-cov",
"pytest-timeout",
Expand Down

0 comments on commit 474093f

Please sign in to comment.