From 477c2e069604d197ce8f5ae5c8c7ec17ee5f954e Mon Sep 17 00:00:00 2001 From: Pablo Prietz Date: Wed, 28 Sep 2022 15:20:31 +0200 Subject: [PATCH] Add high-frequency message sending option --- CHANGES.rst | 13 +++-- docs/examples.rst | 2 +- examples/hmd_streaming.py | 26 +++++----- setup.cfg | 2 +- .../pupil_core_network_client/device.py | 49 ++++++++++++++++--- 5 files changed, 69 insertions(+), 23 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 7f23e07..8572fd5 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,4 +1,11 @@ -1.0.0a3 (2020-03-25) +1.0.0a4 (2022-09-28) +#################### +- Add :py:meth:`pupil_labs.pupil_core_network_client.Device.high_frequency_message_sending`, + a context manager that makes :py:meth:`pupil_labs.pupil_core_network_client.Device.send_message` + more efficient by connecting directly to the IPC backend't PUB port instead of relaying + messages via Pupil Remote. + +1.0.0a3 (2022-03-25) #################### - Add :py:meth:`pupil_labs.pupil_core_network_client.Device.request_plugin_start_eye_process` and accompanying example @@ -9,13 +16,13 @@ :py:meth:`pupil_labs.pupil_core_network_client.Device.subscribe_in_background`, :py:class:`pupil_labs.pupil_core_network_client.subscription.Message`, and accompanying example -1.0.0a2 (2020-02-25) +1.0.0a2 (2022-02-25) #################### - Fix README badges - tox: enable isolated builds -1.0.0a1 (2020-02-25) +1.0.0a1 (2022-02-25) #################### Initial functionality: diff --git a/docs/examples.rst b/docs/examples.rst index 937a244..e123961 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -51,4 +51,4 @@ Capture's HMD Streaming backend. .. literalinclude:: ../examples/hmd_streaming.py :language: python :linenos: - :emphasize-lines: 12,16,18-20,38 + :emphasize-lines: 13,17,19-21,27,30-36 diff --git a/examples/hmd_streaming.py b/examples/hmd_streaming.py index 55ec375..23cac61 100644 --- a/examples/hmd_streaming.py +++ b/examples/hmd_streaming.py @@ -21,17 +21,21 @@ def main(address: str, port: int, frame_rate_hz: int): ) with contextlib.suppress(KeyboardInterrupt): - increasing_index = 0 - while True: - send_image( - device, - gray_image(increasing_index), - frame_topic, - increasing_index, - timestamp=device.current_pupil_time(), - ) - increasing_index += 1 - time.sleep(1 / frame_rate_hz) + # Enable sending messages directly to the IPC backbone instead of having + # Pupil Remote forward every message one by one and waiting for a response each + # time. This allows sending messages with a much higher rate. + with device.high_frequency_message_sending(): + increasing_index = 0 + while True: + send_image( + device, + gray_image(increasing_index), + frame_topic, + increasing_index, + timestamp=device.current_pupil_time(), + ) + increasing_index += 1 + time.sleep(1 / frame_rate_hz) def send_image(device: pcnc.Device, image, topic: str, index: int, timestamp: float): diff --git a/setup.cfg b/setup.cfg index 9567999..3caaeb2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,7 +40,7 @@ docs = furo jaraco.packaging>=8.2 rst.linker>=1.9 - sphinx<4.4 # 4.4 does not detect TypeVars correctly + sphinx examples = numpy testing = diff --git a/src/pupil_labs/pupil_core_network_client/device.py b/src/pupil_labs/pupil_core_network_client/device.py index 6cee9e6..eb5ef35 100644 --- a/src/pupil_labs/pupil_core_network_client/device.py +++ b/src/pupil_labs/pupil_core_network_client/device.py @@ -1,5 +1,6 @@ from __future__ import annotations +import contextlib import logging import statistics import time @@ -38,6 +39,7 @@ def __init__( self.clock_offset_statistics: ClockOffsetStatistics = None "Statistic results of the clock offset estimation" self._req_socket = None + self._pub_socket = None self.connect() @property @@ -58,6 +60,33 @@ def disconnect(self): self._req_socket.close() self._req_socket = None + @contextlib.contextmanager + @ensure_connected + def high_frequency_message_sending(self): + """Context manager that improves the efficiency of :py:meth:`.send_message` + + Instead of sending the message to Pupil Remote via the REQ socket and waiting + for a response, the device will use a PUB socket connected directly to the Pupil + Capture IPC. As a result, the client is able to send messages with a + considerable higher frequency, e.g. streaming scene and eye videos to the HMD + video backend. + + Example: + + .. code-block:: python + + device = Device() + with device.high_frequency_message_sending(): + device.send_message(...) + """ + try: + self._pub_socket = zmq.Context.instance().socket(zmq.PUB) + self._pub_socket.connect(f"tcp://{self.address}:{self.ipc_pub_port}") + yield + finally: + self._pub_socket.close() + self._pub_socket = None + @ensure_connected def current_pupil_time(self) -> float: return self.client_clock() + self.clock_offset_statistics.mean_offset @@ -142,23 +171,29 @@ def send_annotation( def send_message(self, payload: dict) -> str: if "topic" not in payload: raise ValueError("`payload` needs to contain `topic` field") + + socket, wait_for_response = ( + (self._pub_socket, False) if self._pub_socket else (self._req_socket, True) + ) if "__raw_data__" not in payload: # IMPORTANT: serialize first! Else if there is an exception # the next message will have an extra prepended frame serialized_payload = msgpack.packb(payload, use_bin_type=True) - self._req_socket.send_string(payload["topic"], flags=zmq.SNDMORE) - self._req_socket.send(serialized_payload) + socket.send_string(payload["topic"], flags=zmq.SNDMORE) + socket.send(serialized_payload) else: extra_frames = payload.pop("__raw_data__") if not isinstance(extra_frames, Sequence): raise ValueError("`payload['__raw_data__'] needs to be a sequence`") - self._req_socket.send_string(payload["topic"], flags=zmq.SNDMORE) + socket.send_string(payload["topic"], flags=zmq.SNDMORE) serialized_payload = msgpack.packb(payload, use_bin_type=True) - self._req_socket.send(serialized_payload, flags=zmq.SNDMORE) + socket.send(serialized_payload, flags=zmq.SNDMORE) for frame in extra_frames[:-1]: - self._req_socket.send(frame, flags=zmq.SNDMORE, copy=True) - self._req_socket.send(extra_frames[-1], copy=True) - return self._req_socket.recv_string() + socket.send(frame, flags=zmq.SNDMORE, copy=True) + socket.send(extra_frames[-1], copy=True) + + response: str = self._req_socket.recv_string() if wait_for_response else "OK" + return response @ensure_connected def estimate_client_to_remote_clock_offset(