Skip to content

Commit

Permalink
Merge pull request #11 from papr/auto-reconnect
Browse files Browse the repository at this point in the history
Opt-in auto-reconnect of PUB socket
  • Loading branch information
papr authored Sep 28, 2022
2 parents 35fa69d + 156c0a7 commit d9461ef
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 10 deletions.
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/asottile/pyupgrade
rev: v2.37.3
rev: v2.38.2
hooks:
- id: pyupgrade
name: PyUpgrade 3.6+
args: ["--py36-plus"]
exclude: ^bin/

- repo: https://github.com/pycqa/flake8
rev: 5.0.2
rev: 5.0.4
hooks:
- id: flake8

Expand All @@ -31,7 +31,7 @@ repos:
args: [--profile, black]

- repo: https://github.com/psf/black
rev: 22.6.0
rev: 22.8.0
hooks:
- id: black

Expand Down
3 changes: 3 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
a context manager that makes :py:meth:`pupil_labs.pupil_core_network_client.Device.send_message`
more efficient by connecting directly to the IPC backend't PUB port instead of relaying
messages via Pupil Remote.
- Experimental: Add opt-in auto-reconnect functionality to
:py:class:`pupil_labs.pupil_core_network_client.Device` via ``should_auto_reconnect``
argument.

1.0.0a3 (2022-03-25)
####################
Expand Down
52 changes: 45 additions & 7 deletions src/pupil_labs/pupil_core_network_client/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import msgpack
import zmq
from zmq.utils.monitor import recv_monitor_message

from . import __version__
from .decorators import ensure_connected
Expand All @@ -31,32 +32,63 @@ def __init__(
address: str = "127.0.0.1",
port: int = 50020,
client_clock: ClockFunction = time.monotonic,
should_auto_reconnect: bool = False,
) -> None:
self.client_clock: ClockFunction = client_clock
"Client clock function. Returns time in seconds."
self.address = address
self.port = port
self.clock_offset_statistics: ClockOffsetStatistics = None
"Statistic results of the clock offset estimation"
self._req_socket = None
self._pub_socket = None
self._req_socket: zmq.Socket | None = None
self._pub_socket: zmq.Socket | None = None

self._should_auto_reconnect = should_auto_reconnect
self._req_monitor: zmq.Socket | None = None
self._previously_disconnected = False
self._currently_reconnecting = False
self.connect()

@property
def is_connected(self):
if self._req_monitor and not self._currently_reconnecting:
should_reconnect = False
while self._req_monitor.get(zmq.EVENTS) & zmq.POLLIN:
status = recv_monitor_message(self._req_monitor)
if status["event"] == zmq.EVENT_CONNECTED:
should_reconnect = True
elif status["event"] == zmq.EVENT_DISCONNECTED:
self._previously_disconnected = True

if should_reconnect and self._previously_disconnected:
logger.debug("Reconnecting...")
self._currently_reconnecting = True
self.connect()
if self._pub_socket:
self._teardown_pub_socket()
self._setup_pub_socket()
self._currently_reconnecting = False
self._previously_disconnected = False
logger.debug("Reconnected")

return self._req_socket is not None

def connect(self):
if self.is_connected:
self.disconnect()
self._req_socket = zmq.Context.instance().socket(zmq.REQ)

self._req_socket: zmq.Socket = zmq.Context.instance().socket(zmq.REQ)
if self._should_auto_reconnect:
self._req_monitor = self._req_socket.get_monitor_socket()
self._req_socket.connect(f"tcp://{self.address}:{self.port}")
self._announce(f"connected.v{__version__}")
self._update_ipc_backend_ports()
self.estimate_client_to_remote_clock_offset()

def disconnect(self):
if self._req_socket:
self._req_socket.disable_monitor()
self._req_monitor = None
self._req_socket.close()
self._req_socket = None

Expand All @@ -80,12 +112,18 @@ def high_frequency_message_sending(self):
device.send_message(...)
"""
try:
self._pub_socket = zmq.Context.instance().socket(zmq.PUB)
self._pub_socket.connect(f"tcp://{self.address}:{self.ipc_pub_port}")
self._setup_pub_socket()
yield
finally:
self._pub_socket.close()
self._pub_socket = None
self._teardown_pub_socket()

def _setup_pub_socket(self):
self._pub_socket = zmq.Context.instance().socket(zmq.PUB)
self._pub_socket.connect(f"tcp://{self.address}:{self.ipc_pub_port}")

def _teardown_pub_socket(self):
self._pub_socket.close()
self._pub_socket = None

@ensure_connected
def current_pupil_time(self) -> float:
Expand Down

0 comments on commit d9461ef

Please sign in to comment.