-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPyPacket.py
288 lines (239 loc) · 8.9 KB
/
PyPacket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
'''
The PYPACKET for wrapping google buffer messages in communication streams by indicating the type of data contained within
the string. At this layer, the Packet Data Type and PacketID allows for easy analysis and forwarding of the packet to
the correct process/program running on the device or to another device.
Packet Header Format
-----------------------
PacketDataType [1 Bytes] -> 0 - 255
PacketID [3 Bytes]
PacketPlatform [2 Bytes] -> String
PacketIdentifier [1 Byte] -> Number 0 - 255 ~!!~ Fix me for easy use in mapping outputs
-----------------------
PacketData [N Bytes] -> Serialized Google buffer string (protobuf)
-----------------------
'''
import sys
from struct import *
from google.protobuf import text_format
sys.path.insert(0, './protobuf')
import PyPackets_pb2
RECVBUFF = 8192 #65536 #8192
# TODO: Set the packet to be in a specific binary format (Little Endian)
def printByteArrayInHex(array):
str = '0x'
for a in array:
s = hex(a)
List = s.split('x')
str = str + List[1]
return str
# Enumeration of the Packet Types for simplicity using Hexadecimal
# POSSIBLE NAMING COLLISION WITH CURRENT SETUP (two of the same data types being produced by 1 vehicle = same ID but different internal data)
class PacketDataType:
# GCS Messages
PKT_GCS_CMD = pack('b', 01)
# Aircraft State Msgs
PKT_AUTOPILOT_PIXHAWK = pack('b', 02)
# Network Messages
PKT_NETWORK_MANAGER_HEARTBEAT = pack('b', 10) # [0x0,0x0] #0x00
PKT_NODE_HEARTBEAT = pack('b', 11)
PKT_NETWORK_MANAGER_STATUS = pack('b', 12)
PKT_DMY_MSG = pack('b', 13)
# RF comm-aware project
PKT_RF_DATA_MSG = pack('b', 14)
PKT_RF_PL_MAP_MSG = pack('b', 15)
PKT_RF_STACKED_MAP_MSG = pack('b', 16)
PKT_RF_LEARN_CMD = pack('b',19)
# More will be added
PKT_WAYPOINT = pack('b', 17)
PKT_WAYPOINT_LIST = pack('b', 18)
# Balloon Msgs
PKT_BALLOON_SENSOR_MSG = pack('b', 30)
PKT_BALLOON_SENSOR_SET = pack('b', 31)
# IRISS Wrapper
PKT_IRISS_WRAPPER_MSG = pack('b', 32) # wrap an iriss packet with my packet
# JSON Wrapper
PKT_JSON_STRING = pack('b',33)
# Creation of a PacketID that is used for identifying where a packet source
class PacketID:
def __init__(self, platform, identifer):
self.P = platform # PacketPlatform
self.I = identifer # PacketIdentifer
def getBytes(self):
bytesOut = bytearray(self.P)
bytesOut.append(self.I)
return bytesOut
def getPlatform(self):
return self.P #TODO! is this right?
def getIdentifier(self):
return self.I #TODO! is this right?
# Enumeration of the Packet Platforms for simplicity
class PacketPlatform:
AIRCRAFT = 'AC'
GROUND_CONTROL_STATION = 'GS'
SERVER_STATION = 'SS'
COMMAND_AND_CONTROL_STATION = 'CC'
DUMMY = 'DM'
BALLOON = 'BL'
class PyPacket(object):
def __init__(self):
self.reset()
def reset(self):
self.packet = bytearray(0) #Zero the byte array
'''
Get and Set Full Packet
'''
def getPacket(self):
return self.packet
def setPacket(self, pkt):
self.packet = bytearray(pkt)
def getPacketSize(self):
p = self.getPacket()
return len(p)
'''
Get and Set the Packet Data Type
'''
def getDataType(self):
return self.packet[0:1]
def setDataType(self, byteA):
if len(self.packet) == 0:
self.packet.append(byteA)
else:
self.packet[0] = byteA
'''
Get and Set the Packet Identifier
'''
def getID(self):
return self.packet[1:4]
def setID(self, bytesIn):
if len(self.packet) == 0:
#Add an empty byte for the datatype
self.packet.append(pack('b',0))
for r in range(3):
# TODO! Fix me
self.packet.append(bytesIn[r])
elif len(self.packet) == 1:
for r in range(3):
# TODO! Fix me
self.packet.append(bytesIn[r])
else:
self.packet[1] = bytesIn[0]
self.packet[2] = bytesIn[1]
self.packet[3] = bytesIn[2]
'''
Get and Set the Packet Data
'''
def getData(self):
if len(self.packet) > 4:
return self.packet[4:len(self.packet)]
else:
return None
def clearData(self):
#copy id and datatype
copyOfId = self.getID()
copyOfDT = self.getDataType()
#clear byte array
self.reset()
#replace
self.setDataType(str(copyOfDT))
self.setID(copyOfId)
#self.displayPacket()
def setData(self, d):
if len(self.packet) > 4:
# Copy over the old parts
self.clearData()
# TODO! Change me
#del self.packet[4:len(self.packet)] Old Method
for i in range(len(d)):
self.packet.append(d[i])
def getDataSize(self):
d = self.getData()
if d:
return len(d)
else:
return 0
def printData(self):
data = self.getData()
if self.getDataType() == PacketDataType.PKT_DMY_MSG:
msg = PyPackets_pb2.dummy_msg()
msg.ParseFromString(str(data))
return text_format.MessageToString(msg)
elif self.getDataType() == PacketDataType.PKT_NODE_HEARTBEAT:
msg = PyPackets_pb2.NodeHeartBeat()
msg.ParseFromString(str(data))
return text_format.MessageToString(msg)
else:
return 'No known data type'
def displayPacket(self):
print 'Type = ', printByteArrayInHex(self.getDataType())
# printByteArrayInHex(self.getDataType())
print 'ID = ', printByteArrayInHex(self.getID())
print 'Data:', self.printData()
print 'Size = ', self.getDataSize()
print 'Total Size = ', self.getPacketSize()
'''
Dictionary Dispatch Lookup for DataType and the Protobuf msg creation
'''
def getNMStatus():
return [PyPackets_pb2.NMStatus(), 'NMStatus']
def getNMHeartBeat():
return [PyPackets_pb2.NMHeartBeat(), 'NMHeartBeat']
def getGCSCommand():
return Null
def getNodeHeartBeat():
return [PyPackets_pb2.NodeHeartBeat(), 'NodeHeartBeat']
def getDummy():
return [PyPackets_pb2.dummy_msg(), 'DummyMsg']
def getAircraftPixhawkState():
return [PyPackets_pb2.AircraftPixhawkState(), 'AircraftPixhawkState']
def getRF_PL_Map_Msg():
return [PyPackets_pb2.RF_PL_Map_Msg(), 'RF_PL_Map_Msg']
def getRF_Data_Msg():
return [PyPackets_pb2.RF_Data_Msg(), 'RF_Data_Msg']
def getRF_Learn_Cmd():
return [PyPackets_pb2.RF_Learn_Cmd(), 'RF_Learn_Cmd']
def getRF_Stacked_Map_Msg():
return [PyPackets_pb2.RF_Stacked_Map_Msg, 'RF_Stacked_Map_Msg']
def getBalloon_Sensor_Msg():
return [PyPackets_pb2.Balloon_Sensor_Msg(), 'Balloon_Sensor_Msg']
def getBalloon_Sensor_Set_Msg():
return [PyPackets_pb2.Balloon_Sensor_Set_Msg(), 'Balloon_Sensor_Set_Msg']
def getIRISS_Wrapper():
return [PyPackets_pb2.IRISS_Wrapper(), 'IRISS_Wrapper']
TypeDispatch = {
str(PacketDataType.PKT_NETWORK_MANAGER_STATUS): getNMStatus,
str(PacketDataType.PKT_NETWORK_MANAGER_HEARTBEAT): getNMHeartBeat,
str(PacketDataType.PKT_GCS_CMD): getGCSCommand,
str(PacketDataType.PKT_DMY_MSG): getDummy,
str(PacketDataType.PKT_NODE_HEARTBEAT): getNodeHeartBeat,
str(PacketDataType.PKT_AUTOPILOT_PIXHAWK): getAircraftPixhawkState,
str(PacketDataType.PKT_RF_DATA_MSG): getRF_Data_Msg,
str(PacketDataType.PKT_RF_PL_MAP_MSG): getRF_PL_Map_Msg,
str(PacketDataType.PKT_RF_LEARN_CMD): getRF_Learn_Cmd,
str(PacketDataType.PKT_RF_STACKED_MAP_MSG): getRF_Stacked_Map_Msg,
str(PacketDataType.PKT_BALLOON_SENSOR_MSG): getBalloon_Sensor_Msg,
str(PacketDataType.PKT_BALLOON_SENSOR_SET): getBalloon_Sensor_Set_Msg,
str(PacketDataType.PKT_IRISS_WRAPPER_MSG): getIRISS_Wrapper
}
def getAircraftPlatform():
return PacketPlatform.AIRCRAFT
def getBalloonPlatform():
return PacketPlatform.BALLOON
def getGCSPlatform():
return PacketPlatform.GROUND_CONTROL_STATION
def getServerPlatform():
return PacketPlatform.SERVER_STATION
def getCommandPlatform():
return PacketPlatform.COMMAND_AND_CONTROL_STATION
def getDummyPlatform():
return PacketPlatform.DUMMY
#Set the string to all lowercase for casting into the dictionary
def formatPlatformString(string):
return string.lower()
PlatformDispatch = {
str("aircraft"): getAircraftPlatform,
str("balloon"): getBalloonPlatform,
str("groundstation"): getGCSPlatform,
str("server"): getServerPlatform,
str("command"): getCommandPlatform,
str("dummy"): getDummyPlatform
}