-
Notifications
You must be signed in to change notification settings - Fork 1
/
SleepApneaDetect.py
152 lines (134 loc) · 6.19 KB
/
SleepApneaDetect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import numpy as np
import matplotlib.pyplot as plt
# import time
# import test_menu as tm
# import sys
class SleepApneaDetect:
def __init__(self):
self.record_length = 5 # We are keeping 15 seconds worth of data
self.graph_length = 30 # x-axis length
self.record = np.zeros(self.graph_length) # Array to store data
self.num_item_in_list = 0 # Keeps track of the number of values we have
self.sleep_apnea = False # To indicate whether the data shows signs of sleep apnea
self.update_rate = 1 # updating every second
self.start, self.end = 0, 0 # The timestamp of when SA starts and ends
plt.rcParams['toolbar'] = 'None' # Removes toolbar on the matplotlib window
# Generate and configure the graph
self.fig, self.ax = plt.subplots()
self.line1, = self.ax.plot(self.record)
self.fig.show()
plt.xlim(0, 30)
plt.ylim(0, 12)
plt.xlabel("Seconds (s)")
plt.ylabel("Delta to Beta Ratio")
def update(self, new_value, plot_graph=True):
"""
Updates the graph live as we receive new data values
"""
# Using np.roll to simulate a queue structure
self.record = np.roll(self.record, -self.update_rate)
self.record[(self.graph_length - self.update_rate)] = new_value
# print(self.record[self.graph_length - 1])
self.num_item_in_list += 1
if plot_graph:
self.plot_data()
# Checks whether the data exhibit signs of sleep apnea
if not self.sleep_apnea:
temp_bool = self.detect_start()
if temp_bool:
self.sleep_apnea = temp_bool
self.start = self.num_item_in_list
print("Sleep Apnea")
return True
else:
temp_bool = self.detect_end()
if temp_bool is False:
self.sleep_apnea = temp_bool
self.end = self.num_item_in_list
print("Safe")
return False
# time.sleep(1)
return self.sleep_apnea
def plot_data(self):
"""
Plots data live
"""
self.line1.set_ydata(self.record)
self.fig.canvas.draw()
self.fig.canvas.flush_events()
def plot_summary(self):
"""
Color codes the graph for summary analysis
"""
self.ax.fill_between(np.linspace(0, self.graph_length - 1, self.graph_length), self.record,
color="#FAC1AF", alpha=0.4)
self.ax.fill_between(np.linspace(self.start + 3, self.end + 3, self.end - self.start + 1),
self.record[13:25], color="#EB75D3", alpha=0.4)
# print(self.start, self.end)
self.fig.canvas.draw()
self.fig.canvas.flush_events()
def detect_start(self):
"""
Detect whether the data exhibits signs of sleep apnea using ratio comparison
"""
drop, old = 2, 3 # data compare ratio: 2 seconds of feature vs. 3 seconds of normal
# Ensures that we have 5 seconds worth of data before analysis
if self.num_item_in_list >= self.record_length:
# Calculating average
old_avg = sum(self.record[(self.graph_length - self.record_length):
(self.graph_length - old + 1)]) / old
drop_avg = sum(self.record[(self.graph_length - old + 1):]) / drop
# spike_avg = sum(self.record[self.graph_length - drop:]) / spike
# Detection based on ratio difference
drop_ratio = 0.5
# print(self.record)
"""print(self.record[(self.graph_length - self.record_length - 1):(self.graph_length - drop - spike)])
print(self.record[(self.graph_length - drop - spike):self.graph_length - drop])
print(self.record[self.graph_length - drop:])
print(old_avg, drop_avg, spike_avg)
print(spike_ratio * old_avg, drop_ratio * drop_avg)"""
if drop_avg < drop_ratio * old_avg:
return True # Sleep Apnea
def detect_end(self):
"""
Detects whether we have recovered from sleep apnea
"""
length = 3 # 3 seconds of feature detection
detection_ratio = 2.5
# Ensures that we have 5 seconds worth of data before analysis
if self.num_item_in_list >= self.record_length:
overall_avg = sum(self.record[(self.graph_length - self.record_length):
(self.graph_length - length)]) / (self.record_length - length)
high_avg = sum(self.record[(self.graph_length - length):]) / length
"""print(self.record[(self.graph_length - self.record_length):(self.graph_length - length)])
print(self.record[(self.graph_length - length):])
print(overall_avg, high_avg)
print(overall_avg * detection_ratio)"""
# Detection base on ratio difference
if high_avg > overall_avg * detection_ratio:
return False
def print_data(self):
for d in self.record:
print(d)
if __name__ == "__main__":
data = SleepApneaDetect()
# app = tm.QApplication(sys.argv)
# win = tm.MenuWindow()
# win.show()
testData = [5.715187348624481, 5.673257720967051, 5.649467225521545, 5.665105595989914,
5.737902873218203, 5.731773638799287, 5.7283130478142565, 5.565140710684277, 2.5610874026885915,
1.1098416466178829, 1.484445742223451,
2.5610874026885915, 1.484125376672232,
2.5610874026885915, 1.4921133700125906,
1.5028568339920239, 1.4894132977722276,
1.481514487902809, 2.0934834070644106,
1.3146429767970262, 7.828288934925429,
5.7372248170443925, 5.773551077845704,
7.828288934925429, 5.775365285511127,
7.828288934925429]
for i in testData:
data.update(i)
# win.sa_bool = data.sleep_apnea
data.plot_summary()
plt.show()
# sys.exit(app.exec())