forked from sza2/viatom_pc60fw
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpc60fw.py
129 lines (101 loc) · 4.01 KB
/
pc60fw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python
import sys
import libscrc
import bluepy
import csv
from datetime import datetime
import os
if len(sys.argv) == 1:
print ("Not enough arguments.")
print ("Usage: ./pc60fw.py <Device MAC> [SpO2 logging threshold]")
print ("Logs are written to ~/Pulseoxymeter/ .")
exit()
# Parse SpO2 logging threshold
if len(sys.argv) == 3:
spo2thr = int(sys.argv[2])
else:
spo2thr = 100
stream = bytearray()
# Create log directory (if doesn't exist yet)
logdir = os.path.expanduser("~") + '/Pulseoxymeter'
try:
os.mkdir(logdir, mode = 0o770)
except FileExistsError:
pass
# Create time-stamped logfile
logfilename = logdir + '/' + datetime.now().strftime("%Y-%m-%d_%H-%M") + '.csv'
logfile = open(logfilename, 'w')
logwriter = csv.writer(logfile, delimiter=";")
# Write header in it
logwriter.writerow(['Timestamp', 'SpO2', 'PR', 'PI', 'Battery'])
class MyDelegate(bluepy.btle.DefaultDelegate):
def __init__(self):
bluepy.btle.DefaultDelegate.__init__(self)
self.battery = 0
def handleNotification(self, cHandle, data):
stream.extend(bytearray(data))
while(True):
if(len(stream) == 0):
break
# search for sync sequence
idx = stream.find(b'\xaa\x55')
# gather more bytes if the sync sequence not found
if(idx < 0):
break
# check if there are enough bytes to read the message length
# otherwise skip and gather more bytes
if(len(stream) < idx + 4):
break
length = stream[idx + 3]
# check whether all the bytes of the message available
# otherwise skip and gather more bytes
if(len(stream) < idx + 4 + length):
break
# remove the bytes from the stream prior sync
# (if any - as this should not happen except in case of the firs message)
del stream[0 : idx]
# copy the whole message
message = stream[0 : idx + 4 + length]
# the last byte of the message is a CRC8/MAXIM
# the CRC sum for the whole message (including the CRC) must be 0
if(libscrc.maxim8(message) != 0):
print("CRC error")
# remove the sync bytes and the CRC
message = message[2 : idx + 3 + length]
# remove the processed bytes from the stream
del stream[0 : idx + 4 + length]
# messages with 0x08 on the second spot contains values appear on the OLED display
if(message[2] == 0x01):
print("SpO2: %d PR: %d PI: %1.1f" % (message[3], message[4], message[6] / 10))
# Ignore samples from uninitialized device
if (message[3] != 0 and message[3] <= spo2thr):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logwriter.writerow([ts, message[3], message[4], message[6]/10, self.battery])
if(message[2] == 0x03):
print("Battery Level: %d/3" % (message[3]))
self.battery = message[3]
# Connections often fail randomly at the beginning. This will retry until it works.
Connection = False
while(Connection == False):
try:
pulseoximeter = bluepy.btle.Peripheral(sys.argv[1], "random")
Connection = True
except bluepy.btle.BTLEDisconnectError:
print ("Connection failed, retrying...")
try:
pulseoximeter.setDelegate(MyDelegate())
# enable notification
setup_data = b"\x01\x00"
notify = pulseoximeter.getCharacteristics(uuid='6e400003-b5a3-f393-e0a9-e50e24dcca9e')[0]
notify_handle = notify.getHandle() + 1
pulseoximeter.writeCharacteristic(notify_handle, setup_data, withResponse = True)
# wait for answer
while True:
if pulseoximeter.waitForNotifications(1.0):
continue
#Handle disconnection gracefully
except bluepy.btle.BTLEDisconnectError:
print ("Pulseoximeter disconnected.")
finally:
logfile.close()
pulseoximeter.disconnect()