-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathsend2serial.py
218 lines (183 loc) · 6.72 KB
/
send2serial.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/usr/bin/python
# Based on vogelchr/hp7475a-send (https://github.com/vogelchr/hp7475a-send)
import sys
import time
import os
import serial
import serial.tools.list_ports
from serial import SerialException
from flask_socketio import SocketIO, emit
# import PySimpleGUI as sg
import notification
import globals
# answer to <ESC>.O Output Extended Status Information [Manual: 10-42]
EXT_STATUS_BUF_EMPTY = 0x08 # buffer empty
EXT_STATUS_VIEW = 0x10 # "view" button has been pressed, plotting suspended
EXT_STATUS_LEVER = 0x20 # paper lever raised, potting suspended
ERRORS = {
# our own code
-1: 'Timeout',
-2: 'Parse error of decimal return from plotter',
# from the manual
0: 'no error',
10: 'overlapping output instructions',
11: 'invalid byte after <ESC>.',
12: 'invalid byte while parsing device control instruction',
13: 'parameter out of range',
14: 'too many parameters received',
15: 'framing error, parity error or overrun',
16: 'input buffer has overflowed'
}
class HPGLError(Exception):
def __init__(self, n, cause=None):
self.errcode = n
if cause:
self.causes = [cause]
else:
self.causes = []
def add_cause(self, cause):
self.causes.append(cause)
def __repr__(self):
if type(self.errcode) is str:
errstr = self.errcode
else:
errstr = f'Error {self.errcode}: {ERRORS.get(self.errcode)}'
if self.causes:
cstr = ', '.join(self.causes)
return f'HPGLError: {errstr}, caused by {cstr}'
return f'HPGLError: {errstr}'
def __str__(self):
return repr(self)
# read decimal number, followed by carriage return from plotter
def read_answer(tty):
buf = bytearray()
while True:
c = tty.read(1)
if not c: # timeout
raise HPGLError(-1) # timeout
if c == b'\r':
break
buf += c
try:
return int(buf)
except ValueError as e:
print(repr(e))
raise HPGLError(-2)
def chk_error(tty):
tty.write(b'\033.E')
ret = None
try:
ret = read_answer(tty)
except HPGLError as e:
e.add_cause('ESC.E (Output extended error code).')
raise e
if ret:
raise HPGLError(ret)
def plotter_cmd(tty, cmd, get_answer=False):
tty.write(cmd)
try:
if get_answer:
answ = read_answer(tty)
chk_error(tty)
if get_answer:
return answ
except HPGLError as e:
e.add_cause(f'after sending {repr(cmd)[1:]}')
raise e
def baud_rate_test(serial_port, packet = b'IN;OI;'):
ser = serial.Serial(serial_port)
ser.timeout = 0.5
for baudrate in ser.BAUDRATES:
if 75 <= baudrate <= 19200:
ser.baudrate = baudrate
ser.write(packet)
resp = ser.readall()
if resp == packet:
return baudrate
return 'Unknown'
def listComPorts():
ports = dict(name='ports', content=[])
for i in serial.tools.list_ports.comports():
ports['content'].append(str(i).split(" ")[0])
return ports
def sendToPlotter(socketio, hpglfile, port = 'COM3', baud = 9600, plotter = '7475a'):
print(plotter)
globals.printing = True
input_bytes = None
try:
ss = os.stat(hpglfile)
if ss.st_size != 0:
input_bytes = ss.st_size
except Exception as e:
print('Error stat\'ing file', hpglfile, str(e))
socketio.emit('error', {'error': 'Error stat\'ing file'})
hpgl = open(hpglfile, 'rb')
if (plotter == 'mp4200'):
try:
tty = serial.Serial(port = port, baudrate = 9600, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS, xonxoff = True, timeout = 2.0)
except SerialException as e:
socketio.emit('error', {'error': repr(e)})
print(repr(e))
return False
else:
try:
tty = serial.Serial(port, baudrate = 9600, timeout=2.0)
except SerialException as e:
socketio.emit('error', {'error': repr(e)})
print(repr(e))
return False
# <ESC>.@<dec>;<dec>:
# 1st parameter is buffer size 0..1024, optional
# 2nd parameter is bit flags for operation mode
# 0x01 : enable HW handhaking
# 0x02 : ignored
# 0x04 : monitor mode 1 if set, mode 0 if unset (for terminal)
# 0x08 : 0: disable monitor mode, 1: enable monitor mode
# 0x10 : 0: normal mode, 1: block mode
try:
plotter_cmd(tty, b'\033.@;0:') # Plotter Configuration [Manual 10-27]
plotter_cmd(tty, b'\033.Y') # Plotter On [Manual 10-26]
plotter_cmd(tty, b'\033.K') # abort graphics
plotter_cmd(tty, b'IN;') # HPGL initialize
# plotter_cmd(tty, b'\033.0') # raise error
# Output Buffer Size [Manual 10-36]
bufsz = plotter_cmd(tty, b'\033.L', True)
except HPGLError as e:
print('*** Error initializing the plotter!')
print(e)
socketio.emit('error', {'data': '*** Error initializing the plotter!'})
socketio.emit('error', {'data': str(e)})
# sys.exit(1)
return
print('Buffer size of plotter is', bufsz, 'bytes.')
socketio.emit('status_log', {'data': 'Buffer size of plotter is ' + str(bufsz) + ' bytes.'})
total_bytes_written = 0
while globals.printing == True:
status = plotter_cmd(tty, b'\033.O', True)
if (status & (EXT_STATUS_VIEW | EXT_STATUS_LEVER)):
print('*** Printer is viewing plot, pausing data.')
socketio.emit('status_log', {'data': '*** Printer is viewing plot, pausing data.'})
time.sleep(5.0)
continue
bufsz = plotter_cmd(tty, b'\033.B', True)
if bufsz < 256:
sys.stdout.flush()
time.sleep(0.25)
continue
data = hpgl.read(bufsz - 128)
bufsz_read = len(data)
if bufsz_read == 0:
print('*** EOF reached, exiting.')
notification.telegram_sendNotification('*** EOF reached, exiting.')
socketio.emit('status_log', {'data': '*** EOF reached, exiting.'})
break
if input_bytes != None:
percent = 100.0 * total_bytes_written/input_bytes
print(f'{percent:.2f}%, {total_bytes_written} byte written.')
socketio.emit('status_log', {'data': f'{percent:.2f}%, {total_bytes_written} byte written.'})
socketio.emit('print_progress', {'data': f'{percent:.2f}'})
else:
print(f'{percent:.2f}%, {bufsz_read} byte added.')
socketio.emit('status_log', {'data': f'{percent:.2f}%, {bufsz_read} byte added.'})
tty.write(data)
total_bytes_written += bufsz_read