Skip to content

Commit

Permalink
Merge pull request #10 from papr/high-frequency-message-sending
Browse files Browse the repository at this point in the history
Add high-frequency message sending option
  • Loading branch information
papr authored Sep 28, 2022
2 parents 002fb09 + 477c2e0 commit 35fa69d
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 23 deletions.
13 changes: 10 additions & 3 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
1.0.0a3 (2020-03-25)
1.0.0a4 (2022-09-28)
####################
- Add :py:meth:`pupil_labs.pupil_core_network_client.Device.high_frequency_message_sending`,
a context manager that makes :py:meth:`pupil_labs.pupil_core_network_client.Device.send_message`
more efficient by connecting directly to the IPC backend't PUB port instead of relaying
messages via Pupil Remote.

1.0.0a3 (2022-03-25)
####################
- Add :py:meth:`pupil_labs.pupil_core_network_client.Device.request_plugin_start_eye_process`
and accompanying example
Expand All @@ -9,13 +16,13 @@
:py:meth:`pupil_labs.pupil_core_network_client.Device.subscribe_in_background`,
:py:class:`pupil_labs.pupil_core_network_client.subscription.Message`, and accompanying example

1.0.0a2 (2020-02-25)
1.0.0a2 (2022-02-25)
####################

- Fix README badges
- tox: enable isolated builds

1.0.0a1 (2020-02-25)
1.0.0a1 (2022-02-25)
####################

Initial functionality:
Expand Down
2 changes: 1 addition & 1 deletion docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ Capture's HMD Streaming backend.
.. literalinclude:: ../examples/hmd_streaming.py
:language: python
:linenos:
:emphasize-lines: 12,16,18-20,38
:emphasize-lines: 13,17,19-21,27,30-36
26 changes: 15 additions & 11 deletions examples/hmd_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@ def main(address: str, port: int, frame_rate_hz: int):
)

with contextlib.suppress(KeyboardInterrupt):
increasing_index = 0
while True:
send_image(
device,
gray_image(increasing_index),
frame_topic,
increasing_index,
timestamp=device.current_pupil_time(),
)
increasing_index += 1
time.sleep(1 / frame_rate_hz)
# Enable sending messages directly to the IPC backbone instead of having
# Pupil Remote forward every message one by one and waiting for a response each
# time. This allows sending messages with a much higher rate.
with device.high_frequency_message_sending():
increasing_index = 0
while True:
send_image(
device,
gray_image(increasing_index),
frame_topic,
increasing_index,
timestamp=device.current_pupil_time(),
)
increasing_index += 1
time.sleep(1 / frame_rate_hz)


def send_image(device: pcnc.Device, image, topic: str, index: int, timestamp: float):
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ docs =
furo
jaraco.packaging>=8.2
rst.linker>=1.9
sphinx<4.4 # 4.4 does not detect TypeVars correctly
sphinx
examples =
numpy
testing =
Expand Down
49 changes: 42 additions & 7 deletions src/pupil_labs/pupil_core_network_client/device.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import contextlib
import logging
import statistics
import time
Expand Down Expand Up @@ -38,6 +39,7 @@ def __init__(
self.clock_offset_statistics: ClockOffsetStatistics = None
"Statistic results of the clock offset estimation"
self._req_socket = None
self._pub_socket = None
self.connect()

@property
Expand All @@ -58,6 +60,33 @@ def disconnect(self):
self._req_socket.close()
self._req_socket = None

@contextlib.contextmanager
@ensure_connected
def high_frequency_message_sending(self):
"""Context manager that improves the efficiency of :py:meth:`.send_message`
Instead of sending the message to Pupil Remote via the REQ socket and waiting
for a response, the device will use a PUB socket connected directly to the Pupil
Capture IPC. As a result, the client is able to send messages with a
considerable higher frequency, e.g. streaming scene and eye videos to the HMD
video backend.
Example:
.. code-block:: python
device = Device()
with device.high_frequency_message_sending():
device.send_message(...)
"""
try:
self._pub_socket = zmq.Context.instance().socket(zmq.PUB)
self._pub_socket.connect(f"tcp://{self.address}:{self.ipc_pub_port}")
yield
finally:
self._pub_socket.close()
self._pub_socket = None

@ensure_connected
def current_pupil_time(self) -> float:
return self.client_clock() + self.clock_offset_statistics.mean_offset
Expand Down Expand Up @@ -142,23 +171,29 @@ def send_annotation(
def send_message(self, payload: dict) -> str:
if "topic" not in payload:
raise ValueError("`payload` needs to contain `topic` field")

socket, wait_for_response = (
(self._pub_socket, False) if self._pub_socket else (self._req_socket, True)
)
if "__raw_data__" not in payload:
# IMPORTANT: serialize first! Else if there is an exception
# the next message will have an extra prepended frame
serialized_payload = msgpack.packb(payload, use_bin_type=True)
self._req_socket.send_string(payload["topic"], flags=zmq.SNDMORE)
self._req_socket.send(serialized_payload)
socket.send_string(payload["topic"], flags=zmq.SNDMORE)
socket.send(serialized_payload)
else:
extra_frames = payload.pop("__raw_data__")
if not isinstance(extra_frames, Sequence):
raise ValueError("`payload['__raw_data__'] needs to be a sequence`")
self._req_socket.send_string(payload["topic"], flags=zmq.SNDMORE)
socket.send_string(payload["topic"], flags=zmq.SNDMORE)
serialized_payload = msgpack.packb(payload, use_bin_type=True)
self._req_socket.send(serialized_payload, flags=zmq.SNDMORE)
socket.send(serialized_payload, flags=zmq.SNDMORE)
for frame in extra_frames[:-1]:
self._req_socket.send(frame, flags=zmq.SNDMORE, copy=True)
self._req_socket.send(extra_frames[-1], copy=True)
return self._req_socket.recv_string()
socket.send(frame, flags=zmq.SNDMORE, copy=True)
socket.send(extra_frames[-1], copy=True)

response: str = self._req_socket.recv_string() if wait_for_response else "OK"
return response

@ensure_connected
def estimate_client_to_remote_clock_offset(
Expand Down

0 comments on commit 35fa69d

Please sign in to comment.