-
Notifications
You must be signed in to change notification settings - Fork 0
/
ComConnect.py
131 lines (118 loc) · 3.99 KB
/
ComConnect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# ComConnect.py
"""CLass ComConnect(), use for chating with Arduino or another COM PORT"""
import serial
import serial.tools.list_ports
import keyboard
CMD = 0xff
STD_SPEED = 9600 # Скорость COM порта
COM_PORT = 'COM3'
class ComConnect:
"""Create connection with Arduino or another COM PORT"""
def __init__(self, com_port=COM_PORT, std_speed=STD_SPEED):
print('INITIALIZE... | ConConnect...', end='')
self.com_port = com_port
self. std_speed = std_speed
self.status = 2
self._time_response = 0
self._ports = []
self.piSerial = serial.Serial()
self.piSerial.close()
print(' DONE |')
def begin_initialization(self, cmd=CMD):
"""Initialization parameters for COM port"""
print(f'INITIALIZE... | COM {self.com_port}', end='')
self.piSerial.baudrate = self.std_speed
self.piSerial.port = self.com_port
self.piSerial.timeout = 0.1
self.piSerial.write_timeout = 0.1
print(' DONE |')
self.piSerial.open()
print(f'|PORT:{self.com_port}| OPENED')
self.piSerial.write(cmd)
print(f'|WRITE| {cmd}')
print('|WHAITING...', end='')
self.status = 0
def begin_connection(self, wr_line=0):
"""Use if we need response for coonecting"""
s = self.piSerial.readline()
if s == b'OK\n':
print('CONNECTED')
# send channels mask
self.write(wr_line)
print('|WHAITING DATA...')
self.status = 1
def begin(self, com_port=COM_PORT):
"""Setup COM-port and connect"""
self.com_port = com_port
if keyboard.is_pressed('q'):
self.piSerial.close()
print('CLOSING... | EXIT')
try:
if self.status == 2:
self.begin_initialization()
except (OSError, serial.SerialException,
serial.serialutil.SerialException):
try:
self.piSerial.close()
except (OSError, serial.SerialException):
self.status = 2
return "NO CONN"
self.status = 2
return "NO CONN"
return self.status
def write(self, wr_line):
"""Write line data"""
try:
self.piSerial.write((str(wr_line) + '\r\n\0').encode())
except (OSError, serial.SerialException):
self.status = 2
return "NO CONN"
return 1
def read(self):
"""Read data from COM Port"""
try:
if self.piSerial.inWaiting() > 0:
try:
tmp = self.piSerial.read(self.piSerial.inWaiting())
if tmp != tmp:
return 0
else:
return tmp
except ValueError:
return 0
except (OSError, serial.SerialException,
serial.serialutil.SerialException):
try:
self.piSerial.close()
except (OSError, serial.SerialException):
self.status = 2
return "NO CONN"
self.status = 2
return "NO CONN"
def serial_ports(self):
""" Lists serial port names"""
ports = serial.tools.list_ports.comports()
self._ports = []
for i in ports:
s = ''
for j in str(i):
if j != ' ':
s += j
else:
self._ports.append(s)
break
return self._ports
def set_status(self, sts):
"""Set status for reconnection or read data"""
self.status = sts
if self.status == 2:
self.__init__(self.com_port, self. std_speed)
def close(self):
"""Close COM port"""
try:
self.piSerial.close()
except (OSError, serial.SerialException):
self.status = 2
return "NO CONN"
self.status = 2
return "NO CONN"