-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathZigTools.py
308 lines (269 loc) · 10.1 KB
/
ZigTools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
'''
Created on May 28, 2013
@author: Mike Warner
ZigTools.py is a framework for the freakduino which provides comparable
functionality to the killerbee framework
Copyright (C) 2013 Mike Warner
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
'''
import serial
import time
import os
import struct
import math
import threading
''' Global Variables '''
__zigOutFile__ = None
__zigInFile__ = None
__zigSerial__ = None
__zigListen__ = False
''' Helper Methods '''
def rssiToPercent(aByte):
return round(byteToInt(aByte) / 84.0 * 100.0, 0)
def byteToInt(aByte):
return int(aByte.encode('hex'), 16)
def __isCallable__(aCallback):
return aCallback != None and callable(aCallback)
def prettyHex(aString, newLine = 16):
size = len(aString)
output = ""
for i in range(size):
output += aString[i:i+1].encode("hex") + " "
if (i + 1) % newLine == 0 and i != 0 and newLine > 0:
output += "\n"
return str.strip(output)
''' Class Definitions '''
class Frame:
def __init__(self, aFrame, aRssi = 0):
self.frame = aFrame
self.rssi = aRssi
def getSize(self):
return byteToInt(self.frame[0, 1])
class RadioResponse:
def __init__(self, aCommandCode, aResponseCode):
self.commandCode = aCommandCode
self.responseCode = aResponseCode
class __RadioListener__(threading.Thread):
def __init__(self, aFrameCallback, aCommandCallback):
threading.Thread.__init__(self)
self.frameCallback = aFrameCallback
self.commandCallback = aCommandCallback
def run(self):
global __zigListen__
global __zigSerial__
while __zigListen__:
# Not sure if I need this, but doesn't hurt
time.sleep(.005)
size = __zigSerial__.inWaiting()
if size > 0:
response = __zigSerial__.read(1)
frameSize = __zigSerial__.read(1)
# New data from the radio
if response == '\xb0':
frame = __zigSerial__.read(byteToInt(frameSize))
if __isCallable__(self.frameCallback):
self.frameCallback(Frame(frameSize + frame[:-1], frame[-1:]))
# Send data response or change channel response or system response
elif response == '\xb1' or response == '\xb2' or response == '\xbf':
data = __zigSerial__.read(1)
if __isCallable__(self.commandCallback):
self.commandCallback(RadioResponse(response, data))
else:
print "Unknown Response: " + prettyHex(response)
print "Frame Size: ", byteToInt(frameSize) , " (" + prettyHex(frameSize) + ")"
print "Size of buffer: ", size
print "Data in buffer: "
data = __zigSerial__.read(size - 2)
print prettyHex(data)
print "Raw Data in Buf: "
print data
print "Quitting Radio Listener thread!"
__zigListen__ = False
''' Output PCAP file functions '''
def initOutPcapFile(aFile):
global __zigOutFile__
exists = os.path.isfile(aFile)
try:
__zigOutFile__ = open(aFile, "ab", 0)
if exists:
return
except Exception, e:
print "Failed to open", aFile
print e
exit()
# Write magic number
__zigOutFile__.write('\xd4\xc3\xb2\xa1')
# Write major version
__zigOutFile__.write('\x02\x00')
# Write minor version
__zigOutFile__.write('\x04\x00')
# Write time sec since epoch
__zigOutFile__.write(struct.pack('<I', round(time.time())))
# Write accuracy of time
__zigOutFile__.write('\x00\x00\x00\x00')
# Write max len of data
__zigOutFile__.write('\xff\xff\x00\x00')
# Write data link type (802.15.4)
__zigOutFile__.write('\xc3\x00\x00\x00')
def writeFrameToPcap(aFrame):
global __zigOutFile__
if __zigOutFile__ == None:
return False
frame = aFrame.frame
aTime = time.time()
sec = math.floor(aTime)
mic = round((aTime - sec) * 1000000)
# Write out sec and microsec in 4 byte little endian
__zigOutFile__.write(struct.pack('<I', sec))
__zigOutFile__.write(struct.pack('<I', mic))
frameLen = int(frame[0:1].encode('hex'), 16)
# Write frame length - len headed
__zigOutFile__.write(struct.pack('<I', frameLen - 1))
# Write out the actual frame len
__zigOutFile__.write(struct.pack('<I', frameLen))
# Write out frame without len
__zigOutFile__.write(frame[1:])
''' Initialize In PCAP file '''
def initInPcapFile(aFile):
global __zigInFile__
try:
__zigInFile__ = open(aFile, "rb")
except Exception, e:
print "Failed to open input file: " + e
def getFrameFromPcap(index):
global __zigInFile__
if __zigInFile__ == None:
return False
frameCount = 1
frame = ""
try:
# move the file pointer back to the top of the file
__zigInFile__.seek(0)
# we dont care about the pcap header
temp = __zigInFile__.read(20)
# lets just check to make sure the pcap file is the correct
# data link type (802.15.4)
temp = __zigInFile__.read(4)
if(temp == '\xc3\x00\x00\x00'):
while temp:
# dont care about the frame time
temp = __zigInFile__.read(8)
# get size saved in file
# read the first byte since its in little endian and the size is
# never > 127
size = int(__zigInFile__.read(1).encode("hex"), 16)
# the last 3 bytes of the size (unused)
temp = __zigInFile__.read(3)
# get actual size of frame
actualSize = int(__zigInFile__.read(1).encode("hex"), 16)
# last 3 bytes of the size (unused)
temp = __zigInFile__.read(3)
# get the payload
data = __zigInFile__.read(size)
if(index == frameCount):
frame = chr(actualSize) + data
break
# increment the frameCount to know what pcap record we're on
frameCount += 1
else:
print "PCAP file is of the wrong data link type (not 802.15.4)"
frame = False
except:
print "Frame at index " + str(index) + " could not be found"
frame = False
return Frame(frame)
''' Change channel functions '''
def getNextChannel(currChannel, upDown):
if upDown == "+":
if currChannel == 26:
return 11
else:
return (currChannel + 1)
if upDown == "-":
if(currChannel == 11):
return 26
else:
return (currChannel - 1)
return (currChannel, 11)[currChannel >= 11 and currChannel <= 26]
def changeChannel(aChannel):
global __zigSerial__
if __zigSerial__ == None:
return False
if aChannel >= 11 and aChannel <= 26:
ba = "\xa2" + chr(aChannel)
__zigSerial__.write(ba)
return True
else:
False
''' Send data to radio functions '''
def sendRawData(aFrame):
global __zigSerial__
if __zigSerial__ == None:
return False
# if the aFrame is false from not being found in getFrameFromPcap
if not aFrame.frame:
return False
# send data request
ba = "\xa1" + aFrame.frame
__zigSerial__.write(ba)
''' Start and Stop functions '''
def initialize(comPort, channel = 11, frameCallback = None, commandCallback = None, comSpeed = 57600):
global __zigSerial__
global __zigListen__
if __zigListen__:
print "Listener already initialized."
return
# if the frameCallback and radioCommandCall are callable
try:
__zigSerial__ = serial.Serial(comPort, comSpeed, 8, 'N', 1)
except serial.serialutil.SerialException, e:
print e
exit()
now = time.clock()
while now > time.clock() - 5:
time.sleep(.05)
size = __zigSerial__.inWaiting()
if size > 1:
# if the response is 0xbf
if __zigSerial__.read(1) == "\xbf":
# we dont care about the size so get it off of the serial queue
__zigSerial__.read(1)
# and if the next char is 0x00
if __zigSerial__.read(1) == "\x00":
__zigListen__ = True
thread1 = __RadioListener__(frameCallback, commandCallback)
thread1.start()
# and if there is a frameCallback
if __isCallable__(frameCallback):
# tell the radio to start sending frames
__zigSerial__.write("\xa0")
changeChannel(channel)
break
# never got a response back from the radio. No sense in continuing
if not __zigListen__:
print "Failed to connect to device!"
terminate()
exit()
def terminate():
global __zigListen__
global __zigSerial__
global __zigOutFile__
global __zigInFile__
__zigListen__ = False
# Give some time for the tread to end gracefully
time.sleep(.05)
if __zigSerial__ != None:
__zigSerial__.close()
if __zigOutFile__ != None:
__zigOutFile__.close()
if __zigInFile__ != None:
__zigInFile__.close()