-
Notifications
You must be signed in to change notification settings - Fork 1
/
Frame.py
235 lines (176 loc) · 6.54 KB
/
Frame.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
"""
Borrowed from
https://forum.digikey.com/t/getting-started-with-the-ti-awr1642-mmwave-sensor/13445
Using this to establish the general structure of the Packets received from the mmWave radar chip
"""
import datetime
import json
import struct
class FrameError(Exception):
def __init__(self, value):
self.value = value
class TLVData:
def __init__(self, serial_tlv):
pass
@staticmethod
def preparsing(serial_tlv):
# Only used if there is extra data in a frame outside of the regular objects
return None, serial_tlv
class DataObjDescriptor:
def __init__(self, serial_tlv):
elements = struct.unpack('<2h', serial_tlv)
self.numDetectedObj = elements[0]
self.xyzQFormat = elements[1]
class DetectedPoints(TLVData):
NAME = 'DETECTED_POINTS'
SIZE_BYTES = 16
def __init__(self, serial_tlv):
super(DetectedPoints, self).__init__(serial_tlv)
# Unpack elements from the structure
elements = struct.unpack('<ffff', serial_tlv)
self.x = elements[0]
self.y = elements[1]
self.z = elements[2]
self.doppler = elements[3]
def __str__(self):
return "X: " + str(self.x) + "\tY: " + str(self.y) + "\tZ: " + str(self.z) + "\tSpeed: " + str(self.doppler)
class RangeProfile(TLVData):
NAME = 'RANGE_PROFILE'
SIZE_BYTES = 2
pass
class NoiseFloorProfile(TLVData):
NAME = 'NOISE_PROFILE'
SIZE_BYTES = 2
pass
class AzimuthStaticHeatmap(TLVData):
NAME = 'AZIMUTH_HEATMAP'
SIZE_BYTES = 4
pass
class RangeDopplerHeatmap(TLVData):
NAME = 'RANGE_HEATMAP'
SIZE_BYTES = 4
pass
class Stats(TLVData):
NAME = 'MODULE_STATS'
SIZE_BYTES = 24
def __init__(self, serial_tlv):
super(Stats, self).__init__(serial_tlv)
# Unpack elements from the struct
elements = struct.unpack('<iiii', serial_tlv)
self.frame_time = elements[0]
self.output_time = elements[1]
self.inter_frame_margin = elements[2]
self.inter_chirp_margin = elements[3]
self.active_cpu_load = elements[4]
self.inter_frame_cpu_load = elements[5]
pass
class DetectedPointsSide(TLVData):
NAME = 'SIDE_INFO_FOR_POINTS'
SIZE_BYTES = 4
def __init__(self, serial_tlv):
super(DetectedPointsSide, self).__init__(serial_tlv)
# Unpack elements from the struct
elements = struct.unpack('<HH', serial_tlv)
self.snr = elements[0]
self.noise = elements[1]
pass
class AzimuthElevationHeatmap(TLVData):
NAME = 'AZIMUTH_ELEVATION_HEATMAP'
SIZE_BYTES = 4
pass
class TemperatureStats(TLVData):
NAME = 'TEMPERATURE_STATS'
SIZE_BYTES = 28
pass
TLV_TYPES = {
# Map type ID to the corresponding class
1: DetectedPoints,
2: RangeProfile,
3: NoiseFloorProfile,
4: AzimuthStaticHeatmap,
5: RangeDopplerHeatmap,
6: Stats,
7: DetectedPointsSide,
8: AzimuthElevationHeatmap,
9: TemperatureStats,
}
class TLVHeader:
SIZE_BYTES = 8
def __init__(self, serial_tlv_header):
elements = struct.unpack('<2I', serial_tlv_header)
self.type = elements[0]
self.length = elements[1]
class TLV:
def __init__(self, serial_tlv):
self.header = TLVHeader(serial_tlv[0:TLVHeader.SIZE_BYTES])
# Lookup constructor for the specific type of object
self.obj_class = TLV_TYPES[self.header.type]
self.name = self.obj_class.NAME
# Note this size is PER OBJECT
self.obj_size = self.obj_class.SIZE_BYTES
# Strip off excess headers before parsing objects
headerless_tlv = serial_tlv[TLVHeader.SIZE_BYTES:]
self.descriptor, processed_tlv = self.obj_class.preparsing(headerless_tlv)
try:
self.objects = self.parse_objects(processed_tlv)
except struct.error as e:
# Save whole frame from failing if one TLV is bad
print('Exception while parsing TLV objects: ', e)
self.objects = []
def parse_objects(self, serial_tlv):
objects = []
num_objects = int(self.header.length / self.obj_size)
for i in range(0, num_objects):
new = self.obj_class(serial_tlv[0:self.obj_size])
objects.append(new)
serial_tlv = serial_tlv[self.obj_size:]
return objects
class FrameHeader:
def __init__(self, serial_header):
self.full_header = serial_header
def verify_checksum(self):
pass
class ShortRangeRadarFrameHeader(FrameHeader):
SIZE_BYTES = 40
PACKET_LENGTH_END = 16
def __init__(self, serial_header):
super().__init__(serial_header)
self.sync = serial_header[0:8]
self.version = serial_header[8:12]
self.packetLength = struct.unpack('<I', serial_header[12:16])[0]
self.platform = serial_header[16:20]
values = struct.unpack('<5I', serial_header[20:40])
self.frameNumber = values[0]
self.timeCpuCycles = values[1]
self.numDetectedObj = values[2]
self.numTLVs = values[3]
self.subFrameNumber = values[4]
class Frame:
FRAME_START = b'\x02\x01\x04\x03\x06\x05\x08\x07'
def __init__(self, serial_frame, frame_type):
# Parse serial data into header and TLVs
# Note that frames are LITTLE ENDIAN
self.time = datetime.datetime.now()
# Length should be > header_size
frame_length = len(serial_frame)
if frame_length < frame_type.SIZE_BYTES:
raise FrameError('Frame is smaller than required header size. '
'Expected length {}. Measured length {}.'.format(frame_type.SIZE_BYTES, frame_length))
# Initialize the header
self.header = frame_type(serial_frame[0:frame_type.SIZE_BYTES])
# Second sanity check
if frame_length < self.header.packetLength:
raise FrameError('Frame is too small. Expected {} bytes, '
'received {} bytes.'.format(self.header.packetLength, frame_length))
# Convert remaining data into TLVs
full_tlv_data = serial_frame[frame_type.SIZE_BYTES:]
tlv_data = full_tlv_data
self.tlvs = []
for i in range(self.header.numTLVs):
# Check header to get length of each TLV
length = TLVHeader.SIZE_BYTES + TLVHeader(tlv_data[0:TLVHeader.SIZE_BYTES]).length
# Create a 'length' bytes TLV instance
new_tlv = TLV(tlv_data[0:length])
self.tlvs.append(new_tlv)
# Slice off the consumed TLV data
tlv_data = tlv_data[length:]