-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcom.py
64 lines (50 loc) · 1.45 KB
/
com.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from sys import stdin, stdout
from debug import add_stat, inc_stat, IS_MICROPYTHON
import log
if IS_MICROPYTHON:
import eventcom_micropy as eventcom
else:
import eventcom_desktop as eventcom
def to_hex(buffer):
return " ".join([hex(v) for v in buffer])
def decode_loco_id(buffer):
if (buffer[0] & 0xC0) == 0xC0:
return ((buffer[0] & 0x3F) << 8) | buffer[1]
return buffer[1]
def calc_checksum(data):
checkSum = 0
for v in data:
checkSum ^= v
return checkSum & 0xFF
def is_valid_message(buffer, checksum):
if checksum == calc_checksum(buffer):
return True
log.error("Checksum failed")
log.warn(f"Msg: {to_hex(buffer)}")
return False
def read(length):
data = stdin.buffer.read(length)
if (isinstance(data, str)):
data = [ord(d) for d in data]
add_stat("Read", len(data))
return data
def read_byte(event_cb=None, period=None):
if event_cb:
event_cb()
b = eventcom.read_byte(event_cb, period)
inc_stat("Read")
return b
return read(1)[0]
def read_into_buffer(buffer, length=1):
data = read(length)
buffer.extend(data)
return data
def write(data):
if isinstance(data, int):
data = [data]
data = bytes(data)
stdout.buffer.write(data)
add_stat("Written", len(data))
def write_with_checksum(data):
write(data)
write(calc_checksum(data))