forked from giejay/nukiPyBridge
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnuki.py
362 lines (346 loc) · 20.6 KB
/
nuki.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
import time
import nacl.utils
import pygatt.backends
import array
from nacl.public import PrivateKey, Box
from byteswap import ByteSwapper
from crc import CrcCalculator
import nuki_messages
import sys
import configparser
import blescan
import bluetooth._bluetooth as bluez
from pathlib import Path
from retry import retry
class Nuki():
# creates BLE connection with NUKI
# -macAddress: bluetooth mac-address of your Nuki Lock
def __init__(self, macAddress, cfg):
self._charWriteResponse = ""
self.parser = nuki_messages.NukiCommandParser()
self.crcCalculator = CrcCalculator()
self.byteSwapper = ByteSwapper()
self.macAddress = macAddress
self.config = configparser.RawConfigParser()
self.config.read(cfg)
self.configfile = cfg
self.device = None
def _makeBLEConnection(self, retries=3):
if self.device == None:
currentTries = 0
adapter = pygatt.backends.GATTToolBackend()
nukiBleConnectionReady = False
while (nukiBleConnectionReady == False and currentTries < retries):
print("Starting BLE adapter...")
adapter.start()
print("Init Nuki BLE connection...")
try:
self.device = adapter.connect(self.macAddress)
nukiBleConnectionReady = True
except:
currentTries += 1
print("Unable to connect, retrying..., retry count: " + str(currentTries))
if self.device == None:
print("Could not connect after " + str(currentTries) + " tries")
else:
print("Nuki BLE connection established")
def isNewNukiStateAvailable(self):
if self.device != None:
self.device.disconnect()
self.device = None
dev_id = 0
try:
sock = bluez.hci_open_dev(dev_id)
except:
print("error accessing bluetooth device...")
sys.exit(1)
blescan.hci_le_set_scan_parameters(sock)
blescan.hci_enable_le_scan(sock)
returnedList = blescan.parse_events(sock, 10)
newStateAvailable = -1
print("isNewNukiStateAvailable() -> search through %d received beacons..." % len(returnedList))
for beacon in returnedList:
beaconElements = beacon.split(',')
if beaconElements[0] == self.macAddress.lower() and beaconElements[1] == "a92ee200550111e4916c0800200c9a66":
print("Nuki beacon found, new state element: %s" % beaconElements[4])
if beaconElements[4] == '-60':
newStateAvailable = 0
else:
newStateAvailable = 1
break
else:
print("non-Nuki beacon found: mac=%s, signature=%s" % (beaconElements[0], beaconElements[1]))
print("isNewNukiStateAvailable() -> result=%d" % newStateAvailable)
return newStateAvailable
# private method to handle responses coming back from the Nuki Lock over the BLE connection
def _handleCharWriteResponse(self, handle, value):
self._charWriteResponse += "".join(format(x, '02x') for x in value)
# method to authenticate yourself (only needed the very first time) to the Nuki Lock
# -publicKeyHex: a public key (as hex string) you created to talk with the Nuki Lock
# -privateKeyHex: a private key (complementing the public key, described above) you created to talk with the Nuki Lock
# -ID : a unique number to identify yourself to the Nuki Lock
# -IDType : '00' for 'app', '01' for 'bridge' and '02' for 'fob'
# -name : a unique name to identify yourself to the Nuki Lock (will also appear in the logs of the Nuki Lock)
def authenticateUser(self, publicKeyHex, privateKeyHex, ID, IDType, name):
self._makeBLEConnection()
if self.device == None:
return
self.config.remove_section(self.macAddress)
self.config.add_section(self.macAddress)
pairingHandle = self.device.get_handle('a92ee101-5501-11e4-916c-0800200c9a66')
print("Nuki Pairing UUID handle created: %04x" % pairingHandle)
publicKeyReq = nuki_messages.Nuki_REQ('0003')
self.device.subscribe('a92ee101-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse, indication=True)
publicKeyReqCommand = publicKeyReq.generate()
self._charWriteResponse = ""
print("Requesting Nuki Public Key using command: %s" % publicKeyReq.show())
self.device.char_write_handle(pairingHandle, publicKeyReqCommand, True, 2)
print("Nuki Public key requested")
time.sleep(2)
commandParsed = self.parser.parse(self._charWriteResponse)
if self.parser.isNukiCommand(self._charWriteResponse) == False:
sys.exit("Error while requesting public key: %s" % commandParsed)
if commandParsed.command != '0003':
sys.exit("Nuki returned unexpected response (expecting PUBLIC_KEY): %s" % commandParsed.show())
publicKeyNuki = commandParsed.publicKey
self.config.set(self.macAddress, 'publicKeyNuki', publicKeyNuki)
self.config.set(self.macAddress, 'publicKeyHex', publicKeyHex)
self.config.set(self.macAddress, 'privateKeyHex', privateKeyHex)
self.config.set(self.macAddress, 'ID', ID)
self.config.set(self.macAddress, 'IDType', IDType)
self.config.set(self.macAddress, 'Name', name)
print("Public key received: %s" % commandParsed.publicKey)
publicKeyPush = nuki_messages.Nuki_PUBLIC_KEY(publicKeyHex)
publicKeyPushCommand = publicKeyPush.generate()
print("Pushing Public Key using command: %s" % publicKeyPush.show())
self._charWriteResponse = ""
self.device.char_write_handle(pairingHandle, publicKeyPushCommand, True, 5)
print("Public key pushed")
time.sleep(2)
commandParsed = self.parser.parse(self._charWriteResponse)
if self.parser.isNukiCommand(self._charWriteResponse) == False:
sys.exit("Error while pushing public key: %s" % commandParsed)
if commandParsed.command != '0004':
sys.exit("Nuki returned unexpected response (expecting CHALLENGE): %s" % commandParsed.show())
print("Challenge received: %s" % commandParsed.nonce)
nonceNuki = commandParsed.nonce
authAuthenticator = nuki_messages.Nuki_AUTH_AUTHENTICATOR()
authAuthenticator.createPayload(nonceNuki, privateKeyHex, publicKeyHex, publicKeyNuki)
authAuthenticatorCommand = authAuthenticator.generate()
self._charWriteResponse = ""
self.device.char_write_handle(pairingHandle, authAuthenticatorCommand, True, 5)
print("Authorization Authenticator sent: %s" % authAuthenticator.show())
time.sleep(2)
commandParsed = self.parser.parse(self._charWriteResponse)
if self.parser.isNukiCommand(self._charWriteResponse) == False:
sys.exit("Error while sending Authorization Authenticator: %s" % commandParsed)
if commandParsed.command != '0004':
sys.exit("Nuki returned unexpected response (expecting CHALLENGE): %s" % commandParsed.show())
print("Challenge received: %s" % commandParsed.nonce)
nonceNuki = commandParsed.nonce
authData = nuki_messages.Nuki_AUTH_DATA()
authData.createPayload(publicKeyNuki, privateKeyHex, publicKeyHex, nonceNuki, ID, IDType, name)
authDataCommand = authData.generate()
self._charWriteResponse = ""
self.device.char_write_handle(pairingHandle, authDataCommand, True, 7)
print("Authorization Data sent: %s" % authData.show())
time.sleep(2)
commandParsed = self.parser.parse(self._charWriteResponse)
if self.parser.isNukiCommand(self._charWriteResponse) == False:
sys.exit("Error while sending Authorization Data: %s" % commandParsed)
if commandParsed.command != '0007':
sys.exit("Nuki returned unexpected response (expecting AUTH_ID): %s" % commandParsed.show())
print("Authorization ID received: %s" % commandParsed.show())
nonceNuki = commandParsed.nonce
authorizationID = commandParsed.authID
self.config.set(self.macAddress, 'authorizationID', authorizationID)
authId = int(commandParsed.authID, 16)
authIDConfirm = nuki_messages.Nuki_AUTH_ID_CONFIRM()
authIDConfirm.createPayload(publicKeyNuki, privateKeyHex, publicKeyHex, nonceNuki, authId)
authIDConfirmCommand = authIDConfirm.generate()
self._charWriteResponse = ""
self.device.char_write_handle(pairingHandle, authIDConfirmCommand, True, 7)
print("Authorization ID Confirmation sent: %s" % authIDConfirm.show())
time.sleep(2)
commandParsed = self.parser.parse(self._charWriteResponse)
if self.parser.isNukiCommand(self._charWriteResponse) == False:
sys.exit("Error while sending Authorization ID Confirmation: %s" % commandParsed)
if commandParsed.command != '000E':
sys.exit("Nuki returned unexpected response (expecting STATUS): %s" % commandParsed.show())
print("STATUS received: %s" % commandParsed.status)
with open(self.configfile, 'w') as configfile:
self.config.write(configfile)
return commandParsed.status
# method to read the current lock state of the Nuki Lock
def readLockState(self):
self._makeBLEConnection()
if self.device == None:
return
keyturnerUSDIOHandle = self.getHandle()
self.executeChallenge('000C', keyturnerUSDIOHandle)
commandParsed = self.parseChallengeResponse('000C')
return commandParsed
# method to perform a lock action on the Nuki Lock:
# -lockAction: 'UNLOCK', 'LOCK', 'UNLATCH', 'LOCKNGO', 'LOCKNGO_UNLATCH', 'FOB_ACTION_1', 'FOB_ACTION_2' or 'FOB_ACTION_3'
def lockAction(self, lockAction):
epoch_time = int(time.time())
self._makeBLEConnection()
if self.device == None:
return
keyturnerUSDIOHandle = self.getHandle()
self.executeChallenge('0004', keyturnerUSDIOHandle)
commandParsed = self.parseChallengeResponse('0004')
self.executeLockAction(keyturnerUSDIOHandle, lockAction, commandParsed)
response = self.checkLockActionResponse()
print("Done in {} seconds".format((int(time.time()) - epoch_time)))
return response
@retry(Exception, tries=8, delay=0.5)
def getHandle(self):
print("Retrieving handle")
keyturnerUSDIOHandle = self.device.get_handle("a92ee202-5501-11e4-916c-0800200c9a66")
print("Handle retrieved")
self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse, indication=True)
print("Subscribed to device")
return keyturnerUSDIOHandle
@retry(Exception, tries=8, delay=0.5)
def executeChallenge(self, request, keyturnerUSDIOHandle):
print("Going to execute challenge")
challengeReq = nuki_messages.Nuki_REQ(request)
challengeReqEncrypted = nuki_messages.Nuki_EncryptedCommand(
authID=self.config.get(self.macAddress, 'authorizationID'), nukiCommand=challengeReq,
publicKey=self.config.get(self.macAddress, 'publicKeyNuki'),
privateKey=self.config.get(self.macAddress, 'privateKeyHex'))
challengeReqEncryptedCommand = challengeReqEncrypted.generate()
self._charWriteResponse = ""
self.device.char_write_handle(keyturnerUSDIOHandle, challengeReqEncryptedCommand, True, 4)
print("Nuki CHALLENGE Request sent: %s" % challengeReq.show())
@retry(Exception, tries=8, delay=0.5)
def parseChallengeResponse(self, request):
commandParsed = self.parser.decrypt(self._charWriteResponse, self.config.get(self.macAddress, 'publicKeyNuki'),
self.config.get(self.macAddress, 'privateKeyHex'))[8:]
if self.parser.isNukiCommand(commandParsed) == False:
raise Exception("Error while checking challenge response")
commandParsed = self.parser.parse(commandParsed)
if commandParsed.command != request:
raise Exception("Parsed command is not equal to the request")
return commandParsed
@retry(Exception, tries=8, delay=0.5)
def executeLockAction(self, keyturnerUSDIOHandle, lockAction, commandParsed):
lockActionReq = nuki_messages.Nuki_LOCK_ACTION()
lockActionReq.createPayload(self.config.getint(self.macAddress, 'ID'), lockAction, commandParsed.nonce)
lockActionReqEncrypted = nuki_messages.Nuki_EncryptedCommand(
authID=self.config.get(self.macAddress, 'authorizationID'), nukiCommand=lockActionReq,
publicKey=self.config.get(self.macAddress, 'publicKeyNuki'),
privateKey=self.config.get(self.macAddress, 'privateKeyHex'))
lockActionReqEncryptedCommand = lockActionReqEncrypted.generate()
self._charWriteResponse = ""
self.device.char_write_handle(keyturnerUSDIOHandle, lockActionReqEncryptedCommand, True, 4)
print("Nuki Lock Action Request sent: %s" % lockActionReq.show())
@retry(Exception, tries=8, delay=0.5)
def checkLockActionResponse(self):
commandParsed = self.parser.decrypt(self._charWriteResponse, self.config.get(self.macAddress, 'publicKeyNuki'),
self.config.get(self.macAddress, 'privateKeyHex'))[8:]
if self.parser.isNukiCommand(commandParsed) == False:
raise Exception("Error while request lock action")
return self.parser.parse(commandParsed)
# method to fetch the number of log entries from your Nuki Lock
# -pinHex : a 2-byte hex string representation of the PIN code you have set on your Nuki Lock (default is 0000)
def getLogEntriesCount(self, pinHex):
self._makeBLEConnection()
keyturnerUSDIOHandle = self.device.get_handle("a92ee202-5501-11e4-916c-0800200c9a66")
self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse, indication=True)
challengeReq = nuki_messages.Nuki_REQ('0004')
challengeReqEncrypted = nuki_messages.Nuki_EncryptedCommand(
authID=self.config.get(self.macAddress, 'authorizationID'), nukiCommand=challengeReq,
publicKey=self.config.get(self.macAddress, 'publicKeyNuki'),
privateKey=self.config.get(self.macAddress, 'privateKeyHex'))
challengeReqEncryptedCommand = challengeReqEncrypted.generate()
self._charWriteResponse = ""
print("Requesting CHALLENGE: %s" % challengeReqEncrypted.generate("HEX"))
self.device.char_write_handle(keyturnerUSDIOHandle, challengeReqEncryptedCommand, True, 5)
print("Nuki CHALLENGE Request sent: %s" % challengeReq.show())
# time.sleep(2)
commandParsed = self.parser.decrypt(self._charWriteResponse, self.config.get(self.macAddress, 'publicKeyNuki'),
self.config.get(self.macAddress, 'privateKeyHex'))[8:]
if self.parser.isNukiCommand(commandParsed) == False:
sys.exit("Error while requesting Nuki CHALLENGE: %s" % commandParsed)
commandParsed = self.parser.parse(commandParsed)
if commandParsed.command != '0004':
sys.exit("Nuki returned unexpected response (expecting Nuki CHALLENGE): %s" % commandParsed.show())
print("Challenge received: %s" % commandParsed.nonce)
logEntriesReq = nuki_messages.Nuki_LOG_ENTRIES_REQUEST()
logEntriesReq.createPayload(0, commandParsed.nonce, self.byteSwapper.swap(pinHex))
logEntriesReqEncrypted = nuki_messages.Nuki_EncryptedCommand(
authID=self.config.get(self.macAddress, 'authorizationID'), nukiCommand=logEntriesReq,
publicKey=self.config.get(self.macAddress, 'publicKeyNuki'),
privateKey=self.config.get(self.macAddress, 'privateKeyHex'))
logEntriesReqEncryptedCommand = logEntriesReqEncrypted.generate()
self._charWriteResponse = ""
self.device.char_write_handle(keyturnerUSDIOHandle, logEntriesReqEncryptedCommand, True, 4)
print("Nuki Log Entries Request sent: %s" % logEntriesReq.show())
# time.sleep(2)
commandParsed = self.parser.decrypt(self._charWriteResponse, self.config.get(self.macAddress, 'publicKeyNuki'),
self.config.get(self.macAddress, 'privateKeyHex'))[8:]
if self.parser.isNukiCommand(commandParsed) == False:
sys.exit("Error while requesting Nuki Log Entries: %s" % commandParsed)
commandParsed = self.parser.parse(commandParsed)
if commandParsed.command != '0026':
sys.exit("Nuki returned unexpected response (expecting Nuki LOG ENTRY): %s" % commandParsed.show())
print("%s" % commandParsed.show())
return int(commandParsed.logCount, 16)
# method to fetch the most recent log entries from your Nuki Lock
# -count: the number of entries you would like to fetch (if available)
# -pinHex : a 2-byte hex string representation of the PIN code you have set on your Nuki Lock (default is 0000)
def getLogEntries(self, count, pinHex):
self._makeBLEConnection()
keyturnerUSDIOHandle = self.device.get_handle("a92ee202-5501-11e4-916c-0800200c9a66")
self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse, indication=True)
challengeReq = nuki_messages.Nuki_REQ('0004')
challengeReqEncrypted = nuki_messages.Nuki_EncryptedCommand(
authID=self.config.get(self.macAddress, 'authorizationID'), nukiCommand=challengeReq,
publicKey=self.config.get(self.macAddress, 'publicKeyNuki'),
privateKey=self.config.get(self.macAddress, 'privateKeyHex'))
challengeReqEncryptedCommand = challengeReqEncrypted.generate()
print("Requesting CHALLENGE: %s" % challengeReqEncrypted.generate("HEX"))
self._charWriteResponse = ""
self.device.char_write_handle(keyturnerUSDIOHandle, challengeReqEncryptedCommand, True, 5)
print("Nuki CHALLENGE Request sent: %s" % challengeReq.show())
# time.sleep(2)
commandParsed = self.parser.decrypt(self._charWriteResponse, self.config.get(self.macAddress, 'publicKeyNuki'),
self.config.get(self.macAddress, 'privateKeyHex'))[8:]
if self.parser.isNukiCommand(commandParsed) == False:
sys.exit("Error while requesting Nuki CHALLENGE: %s" % commandParsed)
commandParsed = self.parser.parse(commandParsed)
if commandParsed.command != '0004':
sys.exit("Nuki returned unexpected response (expecting Nuki CHALLENGE): %s" % commandParsed.show())
print("Challenge received: %s" % commandParsed.nonce)
logEntriesReq = nuki_messages.Nuki_LOG_ENTRIES_REQUEST()
logEntriesReq.createPayload(count, commandParsed.nonce, self.byteSwapper.swap(pinHex))
logEntriesReqEncrypted = nuki_messages.Nuki_EncryptedCommand(
authID=self.config.get(self.macAddress, 'authorizationID'), nukiCommand=logEntriesReq,
publicKey=self.config.get(self.macAddress, 'publicKeyNuki'),
privateKey=self.config.get(self.macAddress, 'privateKeyHex'))
logEntriesReqEncryptedCommand = logEntriesReqEncrypted.generate()
self._charWriteResponse = ""
self.device.char_write_handle(keyturnerUSDIOHandle, logEntriesReqEncryptedCommand, True, 6)
print("Nuki Log Entries Request sent: %s" % logEntriesReq.show())
# time.sleep(2)
messages = self.parser.splitEncryptedMessages(self._charWriteResponse)
print("Received %d messages" % len(messages))
logMessages = []
for message in messages:
print("Decrypting message %s" % message)
try:
commandParsed = self.parser.decrypt(message, self.config.get(self.macAddress, 'publicKeyNuki'),
self.config.get(self.macAddress, 'privateKeyHex'))[8:]
if self.parser.isNukiCommand(commandParsed) == False:
sys.exit("Error while requesting Nuki Log Entries: %s" % commandParsed)
commandParsed = self.parser.parse(commandParsed)
if commandParsed.command != '0024' and commandParsed.command != '0026' and commandParsed.command != '000E':
sys.exit("Nuki returned unexpected response (expecting Nuki LOG ENTRY): %s" % commandParsed.show())
print("%s" % commandParsed.show())
if commandParsed.command == '0024':
logMessages.append(commandParsed)
except:
print("Unable to decrypt message")
return logMessages