-
Notifications
You must be signed in to change notification settings - Fork 0
/
ReceiveAndPlot.py
183 lines (155 loc) · 7.52 KB
/
ReceiveAndPlot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#!/usr/bin/env python
"""
This script is adapted from:
https://github.com/labstreaminglayer/pylsl/blob/master/pylsl/examples/ReceiveAndPlot.py
The only change is using a QtWidgets objects instead of QtGui.
Requirements:
pyqtgraph, pyqt5
Original comments:
ReceiveAndPlot example for LSL
This example shows data from all found outlets in realtime.
It illustrates the following use cases:
- efficiently pulling data, re-using buffers
- automatically discarding older samples
- online postprocessing
"""
import numpy as np
import math
import pylsl
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtWidgets
from typing import List
# Basic parameters for the plotting window
plot_duration = 5 # how many seconds of data to show
update_interval = 60 # ms between screen updates
pull_interval = 500 # ms between each pull operation
class Inlet:
"""Base class to represent a plottable inlet"""
def __init__(self, info: pylsl.StreamInfo):
# create an inlet and connect it to the outlet we found earlier.
# max_buflen is set so data older the plot_duration is discarded
# automatically and we only pull data new enough to show it
# Also, perform online clock synchronization so all streams are in the
# same time domain as the local lsl_clock()
# (see https://labstreaminglayer.readthedocs.io/projects/liblsl/ref/enums.html#_CPPv414proc_clocksync)
# and dejitter timestamps
self.inlet = pylsl.StreamInlet(info, max_buflen=plot_duration,
processing_flags=pylsl.proc_clocksync | pylsl.proc_dejitter)
# store the name and channel count
self.name = info.name()
self.channel_count = info.channel_count()
def pull_and_plot(self, plot_time: float, plt: pg.PlotItem):
"""Pull data from the inlet and add it to the plot.
:param plot_time: lowest timestamp that's still visible in the plot
:param plt: the plot the data should be shown on
"""
# We don't know what to do with a generic inlet, so we skip it.
pass
class DataInlet(Inlet):
"""A DataInlet represents an inlet with continuous, multi-channel data that
should be plotted as multiple lines."""
dtypes = [[], np.float32, np.float64, None, np.int32, np.int16, np.int8, np.int64]
def __init__(self, info: pylsl.StreamInfo, plt: pg.PlotItem):
super().__init__(info)
# calculate the size for our buffer, i.e. two times the displayed data
bufsize = (2 * math.ceil(info.nominal_srate() * plot_duration), info.channel_count())
self.buffer = np.empty(bufsize, dtype=self.dtypes[info.channel_format()])
empty = np.array([])
# create one curve object for each channel/line that will handle displaying the data
self.curves = [pg.PlotCurveItem(x=empty, y=empty, autoDownsample=True) for _ in range(self.channel_count)]
for curve in self.curves:
plt.addItem(curve)
def pull_and_plot(self, plot_time, plt):
# pull the data
_, ts = self.inlet.pull_chunk(timeout=0.0,
max_samples=self.buffer.shape[0],
dest_obj=self.buffer)
# ts will be empty if no samples were pulled, a list of timestamps otherwise
if ts:
ts = np.asarray(ts)
y = self.buffer[0:ts.size, :]
this_x = None
old_offset = 0
new_offset = 0
for ch_ix in range(self.channel_count):
# we don't pull an entire screen's worth of data, so we have to
# trim the old data and append the new data to it
old_x, old_y = self.curves[ch_ix].getData()
# the timestamps are identical for all channels, so we need to do
# this calculation only once
if ch_ix == 0:
# find the index of the first sample that's still visible,
# i.e. newer than the left border of the plot
old_offset = old_x.searchsorted(plot_time)
# same for the new data, in case we pulled more data than
# can be shown at once
new_offset = ts.searchsorted(plot_time)
# append new timestamps to the trimmed old timestamps
this_x = np.hstack((old_x[old_offset:], ts[new_offset:]))
# append new data to the trimmed old data
this_y = np.hstack((old_y[old_offset:], y[new_offset:, ch_ix] - ch_ix))
# replace the old data
self.curves[ch_ix].setData(this_x, this_y)
class MarkerInlet(Inlet):
"""A MarkerInlet shows events that happen sporadically as vertical lines"""
def __init__(self, info: pylsl.StreamInfo):
super().__init__(info)
def pull_and_plot(self, plot_time, plt):
# TODO: purge old markers
strings, timestamps = self.inlet.pull_chunk(0)
if timestamps:
for string, ts in zip(strings, timestamps):
plt.addItem(pg.InfiniteLine(ts, angle=90, movable=False, label=string[0]))
def main():
# firstly resolve all streams that could be shown
inlets: List[Inlet] = []
print("looking for streams")
streams = pylsl.resolve_streams()
# Create the pyqtgraph window
pw = pg.plot(title='LSL Plot')
plt = pw.getPlotItem()
plt.enableAutoRange(x=False, y=True)
# iterate over found streams, creating specialized inlet objects that will
# handle plotting the data
for info in streams:
if info.type() == 'Markers':
if info.nominal_srate() != pylsl.IRREGULAR_RATE \
or info.channel_format() != pylsl.cf_string:
print('Invalid marker stream ' + info.name())
print('Adding marker inlet: ' + info.name())
inlets.append(MarkerInlet(info))
elif info.nominal_srate() != pylsl.IRREGULAR_RATE \
and info.channel_format() != pylsl.cf_string:
print('Adding data inlet: ' + info.name())
inlets.append(DataInlet(info, plt))
else:
print('Don\'t know what to do with stream ' + info.name())
def scroll():
"""Move the view so the data appears to scroll"""
# We show data only up to a timepoint shortly before the current time
# so new data doesn't suddenly appear in the middle of the plot
fudge_factor = pull_interval * .002
plot_time = pylsl.local_clock()
pw.setXRange(plot_time - plot_duration + fudge_factor, plot_time - fudge_factor)
def update():
# Read data from the inlet. Use a timeout of 0.0 so we don't block GUI interaction.
mintime = pylsl.local_clock() - plot_duration
# call pull_and_plot for each inlet.
# Special handling of inlet types (markers, continuous data) is done in
# the different inlet classes.
for inlet in inlets:
inlet.pull_and_plot(mintime, plt)
# create a timer that will move the view every update_interval ms
update_timer = QtCore.QTimer()
update_timer.timeout.connect(scroll)
update_timer.start(update_interval)
# create a timer that will pull and add new data occasionally
pull_timer = QtCore.QTimer()
pull_timer.timeout.connect(update)
pull_timer.start(pull_interval)
import sys
# Start Qt event loop unless running in interactive mode or using pyside.
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtWidgets.QApplication.instance().exec_()
if __name__ == '__main__':
main()