-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserialUtils.py
221 lines (162 loc) · 6.72 KB
/
serialUtils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import serial
import serial.tools.list_ports
import time
import re
from enum import Enum, auto
import config
import logging
#GRBL so assume it constant
BAUDRATE=115200
#THE connection and textual port name associated
__SERIAL = None
__PORT = ''
#The status of the device connected from Serial/GRBL PoV
class ConnectionStatus(Enum):
NOT_CONNECTED = auto()
READY = auto()
BUSY = auto()
ERROR = auto()
__STATUS = ConnectionStatus.NOT_CONNECTED
##############################################################################################################################
#returns the text list of all the available comports
def listAvailableSerialPorts():
return [str(port) for port in serial.tools.list_ports.comports()]
#connects or reconnect or do nothing
def connect(port, forceDisconnect = False):
global __SERIAL, __PORT, __STATUS
#rightly connected?
if __PORT == port and __SERIAL != None :
#nothing to do
return
#connected to wrong port?
if __PORT != '' or __SERIAL != None:
#first DIS-connect
if __STATUS in [ConnectionStatus.BUSY] and not forceDisconnect:
logging.error("Status error: device is busy, cannot disconnect (try force).")
raise Exception ("Status error: device is busy, cannot disconnect (try force).")
disconnect()
#not connected
__SERIAL = serial.Serial(port,BAUDRATE)
__PORT = port
# Wait for grbl to initialize and flush startup text in serial input
time.sleep(2)
__SERIAL.flushInput()
#set status
__STATUS = ConnectionStatus.READY
#Disconnects from Serial
def disconnect():
global __SERIAL, __PORT, __STATUS
try:
if __SERIAL != None:
__SERIAL.close()
except Exception as ex:
logging.error("EXCEPT on disconnect:" + str(ex))
__SERIAL = None
__PORT = ''
__STATUS = ConnectionStatus.NOT_CONNECTED
#send 1 command to the serial device (creating a new connection) and returns the response
def sendCommand (port, cmd:str, ignoreBusyStatus = False) -> str:
global __SERIAL, __STATUS
if not ignoreBusyStatus and __STATUS in [ConnectionStatus.BUSY]:
#allow for error, contrary to send file so you can "unstuck" the device with a magic command ... maybe.
logging.error("Status error: device is busy, cannot send a command.")
raise Exception ("Status error: device is busy, cannot send a command.")
try:
connect(port)
#update status
__STATUS = ConnectionStatus.BUSY
#send
__SERIAL.write(str.encode(cmd + '\n'))
#return result (try to empty the buffer)
res = ''
while True:
res += __SERIAL.readline().decode().strip()
time.sleep(0.1)
if __SERIAL.in_waiting <= 0:
break
#update status
__STATUS = ConnectionStatus.READY
return res
except Exception as ex:
logging.error ("EXCEPT on sendCommand: " + str(ex))
#update status
__STATUS = ConnectionStatus.ERROR
#line modifier : set comments to None
def linemodifier_skipComments(l:str) -> str:
if l[0] == ";":
return None
else:
return l
#line modifier : set laser to 1% (replace Sxxx with S010)
def linemodifier_laserMinimum(l:str) -> str:
#TODO find a way to NOT catch the final "G1 S0" that (should be here to) turn off the laser. Too late for regex now.
l = re.sub("S\\d+", "S010", l)
return l
#line modifier : on delay commands, sleep for a while
def linemodifier_delayCommands(l:str) -> str:
""" https://www.sainsmart.com/blogs/news/grbl-v1-1-quick-reference
G4 Pxxx : Dwell, Pause / Delay in SECONDS
"""
if l.startswith("G4"):
#assume they RTFM and sent duration in SECONDS or it's going to be a very LONG pause
# https://github.com/gnea/grbl/issues/343
time.sleep(float(re.search(r"P[0-9.]+", l).group(0)[1:]))
return None
else:
return l
#process a file, line per line, applying modifiers to each line before sending them (ignore comments, change values on the fly, etc.)
def processFile (port:str, fileFullPath:str, lineModifiers = [linemodifier_skipComments], forceStopCheck = None):
global __SERIAL, __STATUS
if __STATUS in [ConnectionStatus.BUSY, ConnectionStatus.ERROR]:
logging.error("Status error: device is busy or in error, cannot start a new job.")
raise Exception ("Status error: device is busy or in error, cannot start a new job.")
try:
connect(port)
except Exception as ex:
logging.error("Error at connection : " + str(ex))
#update status
__STATUS = ConnectionStatus.NOT_CONNECTED
raise Exception("Failed to connect to device - see logs")
try:
#update status
__STATUS = ConnectionStatus.BUSY
with open(fileFullPath, "r") as f:
for line in f:
l = line.strip()
if l == '':
continue
for mod in lineModifiers:
l = mod(l)
if l == None:
break
if l != None:
#send
__SERIAL.write(str.encode(l + '\n'))
# Wait for grbl response with carriage return
grbl_out = __SERIAL.readline().decode().strip()
#TODO check for the answer to be "ok" and if not handle it
logging.debug(f"{line.strip()} ==> {l.strip()} ==? {str(grbl_out).strip()}")
#check for foreceful stop
if forceStopCheck != None and forceStopCheck():
#it will stop here: so execute the outro somewhere else.
logging.warn(f"Force stop requested on file {fileFullPath}.")
break
#update status
__STATUS = ConnectionStatus.READY
except Exception as ex:
logging.error("Exception processing file : " + str(ex))
#update status
__STATUS = ConnectionStatus.ERROR
raise ex
#process one file for fake (laser min val)
def simulateFile(port:str, fileFullPath:str):
#the modifiers to use when burning a file
linemodif = [linemodifier_skipComments, linemodifier_laserMinimum]
if not config.myconfig.get("G4 delays handled by device", True):
#if your device doesn't handle G4 delays, you can use this to handle them on the software "sending" side
linemodif.append(linemodifier_delayCommands)
processFile(port, fileFullPath, lineModifiers=linemodif)
#returns serial status
def serialStatusEnum():
global __SERIAL, __PORT, __STATUS
return ConnectionStatus.NOT_CONNECTED if __SERIAL == None or __PORT == '' else __STATUS