Skip to content

Commit

Permalink
Improve lsl-to-time.time synchronization.
Browse files Browse the repository at this point in the history
  • Loading branch information
cboulay committed Sep 2, 2024
1 parent 3161fbc commit a22a041
Showing 1 changed file with 53 additions and 25 deletions.
78 changes: 53 additions & 25 deletions src/ezmsg/lsl/units.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,42 @@ class LSLInletSettings(ez.Settings):
use_lsl_clock: bool = False


class ClockSync:
def __init__(self, alpha: float = 0.1, min_interval: float = 0.5):
self.alpha = alpha
self.min_interval = min_interval

self.offset = 0.0
self.last_update = 0.0
self.count = 0

async def update(self, force: bool = False, burst: int = 4) -> None:
dur_since_last = time.time() - self.last_update
dur_until_next = self.min_interval - dur_since_last
if force or dur_until_next <= 0:
offsets = []
for _ in range(burst):
if self.count % 2:
y, x = time.time(), pylsl.local_clock()
else:
x, y = pylsl.local_clock(), time.time()
offsets.append(y - x)
self.last_update = y
await asyncio.sleep(0.001)
offset = np.mean(offsets)

if self.count > 0:
# Exponential decay smoothing
offset = (1 - self.alpha) * self.offset + self.alpha * offset
self.offset = offset
self.count += burst
else:
await asyncio.sleep(dur_until_next)

def convert_timestamp(self, lsl_timestamp: float) -> float:
return lsl_timestamp + self.offset


class LSLInletState(ez.State):
resolver: typing.Optional[pylsl.ContinuousResolver] = None
inlet: typing.Optional[pylsl.StreamInlet] = None
Expand All @@ -145,8 +181,8 @@ class LSLInletUnit(ez.Unit):

OUTPUT_SIGNAL = ez.OutputStream(AxisArray)

# Share timestamp offset (lsl -> sys time) across all instances
clock_state: dict = {"clock_offset": 0.0, "last_update": 0.0, "alpha": 0.1, "interval": 1.0, "count": 0}
# Share clock correction across all instances
clock_sync = ClockSync()

def __init__(self, *args, **kwargs) -> None:
"""
Expand All @@ -155,14 +191,14 @@ def __init__(self, *args, **kwargs) -> None:
among others.
"""
if "info" not in kwargs:
replace = set()
replace_keys = set()
for k, v in kwargs.items():
if k.startswith("stream_"):
replace.add(k)
if len(replace) > 0:
ez.logger.warning(f"LSLInlet kwargs beginning with 'stream_' deprecated. Found {replace}. "
replace_keys.add(k)
if len(replace_keys) > 0:
ez.logger.warning(f"LSLInlet kwargs beginning with 'stream_' deprecated. Found {replace_keys}. "
f"See LSLInfo dataclass.")
for k in replace:
for k in replace_keys:
kwargs[k[7:]] = kwargs.pop(k)

known_fields = [_.name for _ in fields(LSLInfo)]
Expand Down Expand Up @@ -208,23 +244,11 @@ def shutdown(self) -> None:
self.STATE.inlet.close_stream()
self.STATE.inlet = None

def convert_timestamp(self, lsl_timestamp: float) -> float:
if self.SETTINGS.use_lsl_clock:
return 0.0
_clk_state = self.__class__.clock_state # More friendly name
if (time.time() - _clk_state["last_update"]) >= _clk_state["interval"]:
if _clk_state["count"] % 2:
pair = (pylsl.local_clock(), time.time())[::-1]
else:
pair = (time.time(), pylsl.local_clock())
offset = pair[0] - pair[1]
# Exponential smoothing
if _clk_state["last_update"] > 0:
offset = (1 - _clk_state["alpha"]) * _clk_state["clock_offset"] + _clk_state["alpha"] * offset
_clk_state["last_update"] = pair[0]
_clk_state["clock_offset"] = offset
_clk_state["count"] += 1
return lsl_timestamp + _clk_state["clock_offset"]
@ez.task
async def clock_sync_task(self) -> None:
while True:
force = self.clock_sync.count < 1000
await self.clock_sync.update(force=force, burst=1000 if force else 4)

@ez.publisher(OUTPUT_SIGNAL)
async def lsl_pull(self) -> typing.AsyncGenerator:
Expand Down Expand Up @@ -268,6 +292,10 @@ async def lsl_pull(self) -> typing.AsyncGenerator:
}
)

while self.clock_sync.count < 1000:
# Let the clock_sync task do its job at the beginning.
await asyncio.sleep(0.001)

while self.STATE.inlet is not None:
if self._fetch_buffer is not None:
samples, timestamps = self.STATE.inlet.pull_chunk(
Expand All @@ -285,7 +313,7 @@ async def lsl_pull(self) -> typing.AsyncGenerator:
# time.time() gives us NOW, but we want the timestamp of the 0th sample in the chunk
t0 = time.time() - (timestamps[-1] - timestamps[0])
else:
t0 = self.convert_timestamp(timestamps[0])
t0 = self.clock_sync.convert_timestamp(timestamps[0])
if fs <= 0.0:
# Irregular rate streams need to be streamed sample-by-sample
for ts, samp in zip(timestamps, data):
Expand Down

0 comments on commit a22a041

Please sign in to comment.