-
Notifications
You must be signed in to change notification settings - Fork 0
/
modbus.py
131 lines (116 loc) · 4.68 KB
/
modbus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import struct
CRC16_TABLE = (
0x0000,0xC0C1,0xC181,0x0140,0xC301,0x03C0,0x0280,0xC241,0xC601,
0x06C0,0x0780,0xC741,0x0500,0xC5C1,0xC481,0x0440,0xCC01,0x0CC0,
0x0D80,0xCD41,0x0F00,0xCFC1,0xCE81,0x0E40,0x0A00,0xCAC1,0xCB81,
0x0B40,0xC901,0x09C0,0x0880,0xC841,0xD801,0x18C0,0x1980,0xD941,
0x1B00,0xDBC1,0xDA81,0x1A40,0x1E00,0xDEC1,0xDF81,0x1F40,0xDD01,
0x1DC0,0x1C80,0xDC41,0x1400,0xD4C1,0xD581,0x1540,0xD701,0x17C0,
0x1680,0xD641,0xD201,0x12C0,0x1380,0xD341,0x1100,0xD1C1,0xD081,
0x1040,0xF001,0x30C0,0x3180,0xF141,0x3300,0xF3C1,0xF281,0x3240,
0x3600,0xF6C1,0xF781,0x3740,0xF501,0x35C0,0x3480,0xF441,0x3C00,
0xFCC1,0xFD81,0x3D40,0xFF01,0x3FC0,0x3E80,0xFE41,0xFA01,0x3AC0,
0x3B80,0xFB41,0x3900,0xF9C1,0xF881,0x3840,0x2800,0xE8C1,0xE981,
0x2940,0xEB01,0x2BC0,0x2A80,0xEA41,0xEE01,0x2EC0,0x2F80,0xEF41,
0x2D00,0xEDC1,0xEC81,0x2C40,0xE401,0x24C0,0x2580,0xE541,0x2700,
0xE7C1,0xE681,0x2640,0x2200,0xE2C1,0xE381,0x2340,0xE101,0x21C0,
0x2080,0xE041,0xA001,0x60C0,0x6180,0xA141,0x6300,0xA3C1,0xA281,
0x6240,0x6600,0xA6C1,0xA781,0x6740,0xA501,0x65C0,0x6480,0xA441,
0x6C00,0xACC1,0xAD81,0x6D40,0xAF01,0x6FC0,0x6E80,0xAE41,0xAA01,
0x6AC0,0x6B80,0xAB41,0x6900,0xA9C1,0xA881,0x6840,0x7800,0xB8C1,
0xB981,0x7940,0xBB01,0x7BC0,0x7A80,0xBA41,0xBE01,0x7EC0,0x7F80,
0xBF41,0x7D00,0xBDC1,0xBC81,0x7C40,0xB401,0x74C0,0x7580,0xB541,
0x7700,0xB7C1,0xB681,0x7640,0x7200,0xB2C1,0xB381,0x7340,0xB101,
0x71C0,0x7080,0xB041,0x5000,0x90C1,0x9181,0x5140,0x9301,0x53C0,
0x5280,0x9241,0x9601,0x56C0,0x5780,0x9741,0x5500,0x95C1,0x9481,
0x5440,0x9C01,0x5CC0,0x5D80,0x9D41,0x5F00,0x9FC1,0x9E81,0x5E40,
0x5A00,0x9AC1,0x9B81,0x5B40,0x9901,0x59C0,0x5880,0x9841,0x8801,
0x48C0,0x4980,0x8941,0x4B00,0x8BC1,0x8A81,0x4A40,0x4E00,0x8EC1,
0x8F81,0x4F40,0x8D01,0x4DC0,0x4C80,0x8C41,0x4400,0x84C1,0x8581,
0x4540,0x8701,0x47C0,0x4680,0x8641,0x8201,0x42C0,0x4380,0x8341,
0x4100,0x81C1,0x8081,0x4040
)
# def generate_crc16_table():
# crc_table = []
# for byte in range(256):
# crc = 0x0000
# for _ in range(8):
# if (byte ^ crc) & 0x0001:
# crc = (crc >> 1) ^ 0xa001
# else:
# crc >>= 1
# byte >>= 1
# crc_table.append(crc)
# return crc_table
def crc16(data:bytes) -> bytes:
crc = 0xFFFF
for b in data:
crc = (crc >> 8) ^ CRC16_TABLE[((crc) ^ b) & 0xFF]
return struct.pack('<H',crc)
# crc16(b'\x01\x03\x00\x00\x00\x01') == b'\x84\x0A'
# crc16(b'\x01\x03\x02\x00\x56') == b'\x38\x7A'
def read_register(uart, addr:int, register:int) -> int:
"send read holding register (0x03) request to uart and parse response value"
FN = 0x03
data = struct.pack('>BBHH', addr, FN, register, 1)
data = data + crc16(data)
rc = uart.write(data)
raw_resp = uart.read(7)
if not raw_resp:
print(f"read_register({addr=},{register})", "no response!")
return None
else:
print(f'read_register({addr=},{register})', raw_resp.hex())
if len(raw_resp) != 7:
print(f"read_register({addr=},{register})", "response length error!")
return None
resp, crc = raw_resp[:-2], raw_resp[-2:]
if crc != crc16(resp):
print(f"read_register({addr=},{register})", "response crc error!")
return None
_addr, fn, _nbr, value = struct.unpack('>BBBH', resp)
if FN != fn:
print(f"read_register({addr=},{register})", f"response error {fn=}!")
return None
return value
#
def write_register(uart, addr:int, register:int, value:int) -> bool:
"send write register (0x06) request to uart and parse response value"
FN = 0x06
data = struct.pack('>BBHH', addr, FN, register, value)
data = data + crc16(data)
rc = uart.write(data)
raw_resp = uart.read(8)
if not raw_resp:
print(f"write_register({addr=},{register})", "no response!")
return None
else:
print(f'write_register({addr=},{register})', raw_resp.hex())
if len(raw_resp) not in [5, 8]:
print(f"write_register({addr=},{register})", "response length error!")
return None
resp, crc = raw_resp[:-2], raw_resp[-2:]
if crc != crc16(resp):
print(f"write_register({addr=},{register})", "response crc error!")
return None
if resp[1] != FN:
print(f"write_register({addr=},{register})", f"error {resp[2]}!")
return False
return True
#
"""
import modbus
class TestUart:
def read(_self):
return b'\x01\x03\x02\x00\x56\x38\x7A'
def write(_self, data:bytes):
print(f'write({data=})')
return len(data)
modbus.read_register(TestUart(), 1, 0)
# =86
"""
# 3.5 chars between frames, 1.5 between bytes, not less that 1.75
# FRAME_GAP_US = int(1750 if BAUD2 > 19200 else (3.5*11*1000000 / BAUD2))
# sample:
# 0x01 0x03 0x00 0x00 0x00 0x01 0x84 0x0A
# 0x01 0x03 0x02 0x00 0x56 0x38 0x7A