From 156c0a7df98fa5a2f8cc675fb086cf51802ec36e Mon Sep 17 00:00:00 2001 From: Pablo Prietz Date: Wed, 28 Sep 2022 16:18:02 +0200 Subject: [PATCH] Opt-in auto-reconnect of PUB socket --- CHANGES.rst | 3 ++ .../pupil_core_network_client/device.py | 52 ++++++++++++++++--- 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 8572fd5..0d290ae 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -4,6 +4,9 @@ a context manager that makes :py:meth:`pupil_labs.pupil_core_network_client.Device.send_message` more efficient by connecting directly to the IPC backend't PUB port instead of relaying messages via Pupil Remote. +- Experimental: Add opt-in auto-reconnect functionality to + :py:class:`pupil_labs.pupil_core_network_client.Device` via ``should_auto_reconnect`` + argument. 1.0.0a3 (2022-03-25) #################### diff --git a/src/pupil_labs/pupil_core_network_client/device.py b/src/pupil_labs/pupil_core_network_client/device.py index eb5ef35..84a2928 100644 --- a/src/pupil_labs/pupil_core_network_client/device.py +++ b/src/pupil_labs/pupil_core_network_client/device.py @@ -13,6 +13,7 @@ import msgpack import zmq +from zmq.utils.monitor import recv_monitor_message from . import __version__ from .decorators import ensure_connected @@ -31,6 +32,7 @@ def __init__( address: str = "127.0.0.1", port: int = 50020, client_clock: ClockFunction = time.monotonic, + should_auto_reconnect: bool = False, ) -> None: self.client_clock: ClockFunction = client_clock "Client clock function. Returns time in seconds." @@ -38,18 +40,46 @@ def __init__( self.port = port self.clock_offset_statistics: ClockOffsetStatistics = None "Statistic results of the clock offset estimation" - self._req_socket = None - self._pub_socket = None + self._req_socket: zmq.Socket | None = None + self._pub_socket: zmq.Socket | None = None + + self._should_auto_reconnect = should_auto_reconnect + self._req_monitor: zmq.Socket | None = None + self._previously_disconnected = False + self._currently_reconnecting = False self.connect() @property def is_connected(self): + if self._req_monitor and not self._currently_reconnecting: + should_reconnect = False + while self._req_monitor.get(zmq.EVENTS) & zmq.POLLIN: + status = recv_monitor_message(self._req_monitor) + if status["event"] == zmq.EVENT_CONNECTED: + should_reconnect = True + elif status["event"] == zmq.EVENT_DISCONNECTED: + self._previously_disconnected = True + + if should_reconnect and self._previously_disconnected: + logger.debug("Reconnecting...") + self._currently_reconnecting = True + self.connect() + if self._pub_socket: + self._teardown_pub_socket() + self._setup_pub_socket() + self._currently_reconnecting = False + self._previously_disconnected = False + logger.debug("Reconnected") + return self._req_socket is not None def connect(self): if self.is_connected: self.disconnect() - self._req_socket = zmq.Context.instance().socket(zmq.REQ) + + self._req_socket: zmq.Socket = zmq.Context.instance().socket(zmq.REQ) + if self._should_auto_reconnect: + self._req_monitor = self._req_socket.get_monitor_socket() self._req_socket.connect(f"tcp://{self.address}:{self.port}") self._announce(f"connected.v{__version__}") self._update_ipc_backend_ports() @@ -57,6 +87,8 @@ def connect(self): def disconnect(self): if self._req_socket: + self._req_socket.disable_monitor() + self._req_monitor = None self._req_socket.close() self._req_socket = None @@ -80,12 +112,18 @@ def high_frequency_message_sending(self): device.send_message(...) """ try: - self._pub_socket = zmq.Context.instance().socket(zmq.PUB) - self._pub_socket.connect(f"tcp://{self.address}:{self.ipc_pub_port}") + self._setup_pub_socket() yield finally: - self._pub_socket.close() - self._pub_socket = None + self._teardown_pub_socket() + + def _setup_pub_socket(self): + self._pub_socket = zmq.Context.instance().socket(zmq.PUB) + self._pub_socket.connect(f"tcp://{self.address}:{self.ipc_pub_port}") + + def _teardown_pub_socket(self): + self._pub_socket.close() + self._pub_socket = None @ensure_connected def current_pupil_time(self) -> float: