forked from Atlas-Scientific/Raspberry-Pi-sample-code
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ftdi.py
164 lines (131 loc) · 4.02 KB
/
ftdi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import string
import pylibftdi
from pylibftdi.device import Device
from pylibftdi.driver import FtdiError
from pylibftdi import Driver
import os
import time
class AtlasDevice(Device):
def __init__(self, sn):
Device.__init__(self, mode='t', device_id=sn)
def read_line(self, size=0):
"""
taken from the ftdi library and modified to
use the ezo line separator "\r"
"""
lsl = len('\r')
line_buffer = []
while True:
next_char = self.read(1)
if next_char == '' or (size > 0 and len(line_buffer) > size):
break
line_buffer.append(next_char)
if (len(line_buffer) >= lsl and
line_buffer[-lsl:] == list('\r')):
break
return ''.join(line_buffer)
def read_lines(self):
"""
also taken from ftdi lib to work with modified readline function
"""
lines = []
try:
while True:
line = self.read_line()
if not line:
break
self.flush_input()
lines.append(line)
return lines
except FtdiError:
print("Failed to read from the sensor.")
return ''
def send_cmd(self, cmd):
"""
Send command to the Atlas Sensor.
Before sending, add Carriage Return at the end of the command.
:param cmd:
:return:
"""
buf = cmd + "\r" # add carriage return
try:
self.write(buf)
return True
except FtdiError:
print("Failed to send command to the sensor.")
return False
def get_ftdi_device_list():
"""
return a list of lines, each a colon-separated
vendor:product:serial summary of detected devices
"""
dev_list = []
for device in Driver().list_devices():
# list_devices returns bytes rather than strings
dev_info = map(lambda x: x.decode('latin1'), device)
# device must always be this triple
vendor, product, serial = dev_info
dev_list.append(serial)
return dev_list
if __name__ == '__main__':
real_raw_input = vars(__builtins__).get('raw_input', input) # used to find the correct function for python2/3
print("\nWelcome to the Atlas Scientific Raspberry Pi FTDI Serial example.\n")
print(" Any commands entered are passed to the board via UART except:")
print(" Poll,xx.x command continuously polls the board every xx.x seconds")
print(" Pressing ctrl-c will stop the polling\n")
print(" Press enter to receive all data in buffer (for continuous mode) \n")
print("Discovered FTDI serial numbers:")
devices = get_ftdi_device_list()
cnt_all = len(devices)
#print "\nIndex:\tSerial: "
for i in range(cnt_all):
print( "\nIndex: ", i, " Serial: ", devices[i])
print( "===================================")
index = 0
while True:
index = real_raw_input("Please select a device index: ")
try:
dev = AtlasDevice(devices[int(index)])
break
except pylibftdi.FtdiError as e:
print( "Error, ", e)
print( "Please input a valid index")
print( "")
print(">> Opened device ", devices[int(index)])
print(">> Any commands entered are passed to the board via FTDI:")
time.sleep(1)
dev.flush()
while True:
input_val = real_raw_input("Enter command: ")
# continuous polling command automatically polls the board
if input_val.upper().startswith("POLL"):
delaytime = float(string.split(input_val, ',')[1])
dev.send_cmd("C,0") # turn off continuous mode
#clear all previous data
time.sleep(1)
dev.flush()
# get the information of the board you're polling
print("Polling sensor every %0.2f seconds, press ctrl-c to stop polling" % delaytime)
try:
while True:
dev.send_cmd("R")
lines = dev.read_lines()
for i in range(len(lines)):
# print lines[i]
if lines[i][0] != '*':
print("Response: " , lines[i])
time.sleep(delaytime)
except KeyboardInterrupt: # catches the ctrl-c command, which breaks the loop above
print("Continuous polling stopped")
else:
# pass commands straight to board
if len(input_val) == 0:
lines = dev.read_lines()
for i in range(len(lines)):
print( lines[i])
else:
dev.send_cmd(input_val)
time.sleep(1.3)
lines = dev.read_lines()
for i in range(len(lines)):
print( lines[i])