Skip to content

Commit

Permalink
TimeSync: Introduce protocol versioning and change group name convent…
Browse files Browse the repository at this point in the history
…ion (#721)

* Introduce protocol versioning and change group name convention

* Add Time Sync protocol version to UI

* PTS v0.3 -- Host sends timestamps as 64-bit le floats instead of string repr

* Update version numbers to decimals
  • Loading branch information
papr authored and mkassner committed May 19, 2017
1 parent 5a55c4b commit 4535f78
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 29 deletions.
10 changes: 6 additions & 4 deletions pupil_src/shared_modules/network_time_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import socket
import threading
import asyncore
import struct
from random import random

import logging
Expand Down Expand Up @@ -44,9 +45,10 @@ def __init__(self, sock, time_fn):
asyncore.dispatcher_with_send.__init__(self, sock)

def handle_read(self):
data = self.recv(1024)
# expecting `sync` message
data = self.recv(4)
if data:
self.send(repr(self.time_fn()).encode())
self.send(struct.pack('<d', self.time_fn()))

def __del__(self):
pass
Expand Down Expand Up @@ -211,9 +213,9 @@ def _get_offset(self):
for request in range(60):
t0 = self.get_time()
server_socket.send(b'sync')
message = server_socket.recv(1024)
message = server_socket.recv(8)
t2 = self.get_time()
t1 = float(message)
t1 = struct.unpack('<d', message)[0]
times.append((t0, t1, t2))

server_socket.close()
Expand Down
46 changes: 29 additions & 17 deletions pupil_src/shared_modules/time_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
raise Exception("Pyre version is to old. Please upgrade")


__protocol_version__ = 'v1'


class Clock_Service(object):
"""Represents a remote clock service and is sortable by rank."""
def __init__(self, uuid, name, rank, port):
Expand All @@ -55,10 +58,10 @@ class Time_Sync(Plugin):
See `time_sync_spec.md` for details.
"""

def __init__(self, g_pool, node_name=None, sync_group='time_sync_default', base_bias=1.):
def __init__(self, g_pool, node_name=None, sync_group_prefix='default', base_bias=1.):
super().__init__(g_pool)
self.menu = None
self.sync_group = sync_group
self.sync_group_prefix = sync_group_prefix
self.discovery = None

self.leaderboard = []
Expand All @@ -72,18 +75,28 @@ def __init__(self, g_pool, node_name=None, sync_group='time_sync_default', base_

self.restart_discovery(node_name)

@property
def sync_group(self):
return self.sync_group_prefix + '-time_sync-' + __protocol_version__

@sync_group.setter
def sync_group(self, full_name):
self.sync_group_prefix = full_name.rsplit('-time_sync-' + __protocol_version__, maxsplit=1)[0]

def init_gui(self):
def close():
self.alive = False

help_str = "Synchonize time of Pupil Captures across the local network."
self.menu = ui.Growing_Menu('Network Time Sync')
self.menu.append(ui.Button('Close', close))
self.menu.append(ui.Info_Text('Protocol version: ' + __protocol_version__))

self.menu.append(ui.Info_Text(help_str))
help_str = "All pupil nodes of one group share a Master clock."
self.menu.append(ui.Info_Text(help_str))
self.menu.append(ui.Text_Input('node_name', self, label='Node Name', setter=self.restart_discovery))
self.menu.append(ui.Text_Input('sync_group', self, label='Sync Group', setter=self.change_sync_group))
self.menu.append(ui.Text_Input('sync_group_prefix', self, label='Sync Group', setter=self.change_sync_group))

def sync_status():
if self.follower_service:
Expand All @@ -102,15 +115,15 @@ def set_bias(bias):
help_str = "The clock service with the highest bias becomes clock master."
self.menu.append(ui.Info_Text(help_str))
self.menu.append(ui.Text_Input('base_bias', self, label='Master Bias', setter=set_bias))
self.menu.append(ui.Text_Input('leaderboard',self,label='Master Nodes in Group'))
self.menu.append(ui.Text_Input('leaderboard', self, label='Master Nodes in Group'))
self.g_pool.sidebar.append(self.menu)

def recent_events(self, events):
should_announce = False
for evt in self.discovery.recent_events():
if evt.type == 'SHOUT':
try:
self.update_leaderboard(evt.peer_uuid,evt.peer_name, float(evt.msg[0]), int(evt.msg[1]))
self.update_leaderboard(evt.peer_uuid, evt.peer_name, float(evt.msg[0]), int(evt.msg[1]))
except Exception as e:
logger.debug('Garbage raised `{}` -- dropping.'.format(e))
self.evaluate_leaderboard()
Expand All @@ -135,7 +148,7 @@ def update_leaderboard(self, uuid, name, rank, port):
self.remove_from_leaderboard(cs.uuid)
break
else:
#no changes. Just leave as is
# no changes. Just leave as is
return

# clock service was not encountered before or has changed adding it to leaderboard
Expand All @@ -156,20 +169,20 @@ def evaluate_leaderboard(self):
return

current_leader = self.leaderboard[0]
if self.discovery.uuid() != current_leader.uuid:
#we are not the leader!
if self.discovery.uuid() != current_leader.uuid:
# we are not the leader!
leader_ep = self.discovery.peer_address(current_leader.uuid)
leader_addr = urlparse(leader_ep).netloc.split(':')[0]
if self.follower_service is None:
#make new follower
# make new follower
self.follower_service = Clock_Sync_Follower(leader_addr,
port=current_leader.port,
interval=10,
time_fn=self.get_time,
jump_fn=self.jump_time,
slew_fn=self.slew_time)
else:
#update follower_service
# update follower_service
self.follower_service.host = leader_addr
self.follower_service.port = current_leader.port
return
Expand All @@ -185,11 +198,10 @@ def evaluate_leaderboard(self):
logger.debug('Become clock master with rank {}'.format(self.rank))
self.announce_clock_master_info()


def announce_clock_master_info(self):
self.discovery.shout(self.sync_group, [repr(self.rank).encode(),
repr(self.master_service.port).encode()])
self.update_leaderboard(self.discovery.uuid(),self.node_name,self.rank,self.master_service.port)
self.update_leaderboard(self.discovery.uuid(), self.node_name, self.rank, self.master_service.port)

@property
def rank(self):
Expand Down Expand Up @@ -233,15 +245,15 @@ def restart_discovery(self, name):
self.discovery.start()
self.announce_clock_master_info()

def change_sync_group(self, new_group):
if new_group != self.sync_group:
def change_sync_group(self, new_group_prefix):
if new_group_prefix != self.sync_group_prefix:
self.discovery.leave(self.sync_group)
self.leaderboard = []
if self.follower_service:
self.follower_service.terminate()
self.follower = None
self.sync_group = new_group
self.discovery.join(new_group)
self.sync_group_prefix = new_group_prefix
self.discovery.join(self.sync_group)
self.announce_clock_master_info()

def deinit_gui(self):
Expand All @@ -251,7 +263,7 @@ def deinit_gui(self):

def get_init_dict(self):
return {'node_name': self.node_name,
'sync_group': self.sync_group,
'sync_group_prefix': self.sync_group_prefix,
'base_bias': self.base_bias}

def cleanup(self):
Expand Down
19 changes: 11 additions & 8 deletions pupil_src/shared_modules/time_sync_spec.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Pupil Time Sync Protocol

Protocol version: v1
Protocol status: draft

The Pupil Time Sync -- hereinafter referred to as _PTS_ -- protocol consists of two parts:
1. Clock service discovery
2. Time synchronization
Expand All @@ -22,9 +25,10 @@ of existing libraries (e.g. [zyre](https://github.com/zeromq/zyre), [Pyre](https

### Synchronization scope

All PTS actors that should be synchronized SHALL join a user-definable ZRE
group -- hereinafter referred to as _PTS group_ -- which is by default
`time_sync_default`.
All PTS actors that should be synchronized SHALL join a ZRE group -- hereinafter
referred to as _PTS group_. The name of the PTS group is composed of a user-definable
prefix -- by default `default` -- and the fixed string `-time_sync-v1`. Therefore the
default PTS group name is `default-time_sync-v1`.

All clock services SHALL SHOUT their announcements (see below) into the PTS group.

Expand Down Expand Up @@ -92,14 +96,13 @@ and is able to change its clock appropriately.

### Timestamp unit

Timestamps are floats in seconds.
Timestamps are 64-bit little-endian floats in seconds.

### Clock service

The clock service is a simple TCP server that sends the string representation
of its own current timestamp upon receiving the message `sync` from a follower.
The TCP server's network port SHALL be announced as part of the clock service
announcement (see above).
The clock service is a simple TCP server that sends its own current timestamp
upon receiving the message `sync` from a follower. The TCP server's network port
SHALL be announced as part of the clock serviceannouncement (see above).

### Clock follower

Expand Down

0 comments on commit 4535f78

Please sign in to comment.