Skip to content

Commit

Permalink
Make BlockingClient send thread safe (#97)
Browse files Browse the repository at this point in the history
* Make BlockingClient send thread safe
  • Loading branch information
beenje authored Jan 22, 2025
1 parent 5fbf3d9 commit 71f1e50
Showing 1 changed file with 21 additions and 18 deletions.
39 changes: 21 additions & 18 deletions src/pandablocks/blocking.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import socket
from collections.abc import Iterable, Iterator
from threading import Lock
from typing import Optional, Union, overload

from .commands import Command, T
Expand Down Expand Up @@ -48,6 +49,7 @@ def __init__(self, host: str):
self._host = host
self._ctrl_connection = ControlConnection()
self._ctrl_socket = _SocketHelper()
self._ctrl_socket_lock = Lock()

def connect(self):
"""Connect to the control port, and be ready to handle commands"""
Expand Down Expand Up @@ -84,24 +86,25 @@ def send(
return a list of reponses
timeout: If no reponse in this time, raise `socket.timeout`
"""
s = self._ctrl_socket.socket
s.settimeout(timeout)
if isinstance(commands, Command):
commands = [commands]
else:
commands = list(commands)
for command in commands:
to_send = self._ctrl_connection.send(command)
s.sendall(to_send)
# Rely on dicts being ordered, Ellipsis is shorthand for "no response yet"
cr = {id(command): ... for command in commands}
while ... in cr.values():
received = s.recv(4096)
to_send = self._ctrl_connection.receive_bytes(received)
s.sendall(to_send)
for command, response in self._ctrl_connection.responses():
assert cr[id(command)] is ..., "Already got response for {command}"
cr[id(command)] = response
with self._ctrl_socket_lock:
s = self._ctrl_socket.socket
s.settimeout(timeout)
if isinstance(commands, Command):
commands = [commands]
else:
commands = list(commands)
for command in commands:
to_send = self._ctrl_connection.send(command)
s.sendall(to_send)
# Rely on dicts being ordered, Ellipsis is shorthand for "no response yet"
cr = {id(command): ... for command in commands}
while ... in cr.values():
received = s.recv(4096)
to_send = self._ctrl_connection.receive_bytes(received)
s.sendall(to_send)
for command, response in self._ctrl_connection.responses():
assert cr[id(command)] is ..., f"Already got response for {command}"
cr[id(command)] = response
responses = list(cr.values())
for response in responses:
if isinstance(response, Exception):
Expand Down

0 comments on commit 71f1e50

Please sign in to comment.