-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
62 lines (45 loc) · 1.66 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from configparser import RawConfigParser as ConfigParser
from digi.xbee import devices
def get_section(config_parser, name, default=None):
try:
return dict(config_parser[name])
except KeyError:
return None
def get_config(name):
config_parser = ConfigParser()
config_parser.read('config.ini')
return get_section(config_parser, name)
def get_device(config):
port = config.get('port', '/dev/serial0')
bauds = int(config.get('bauds', 9600))
device = config.get('device', 'digimesh')
device = {
'802': devices.Raw802Device,
'zigbee': devices.ZigBeeDevice,
'digimesh': devices.DigiMeshDevice,
}[device]
return device(port, bauds)
def get_address(remote):
"""
Return address from remote device.
"""
address = remote.get_64bit_addr()
if address == address.UNKNOWN_ADDRESS:
address = remote.get_16bit_addr()
return address
def send_data_async(device, remote, data):
# 802.15.4
if isinstance(device, devices.Raw802Device):
address = get_address(remote)
if isinstance(address, devices.XBee64BitAddress):
return device.send_data_async_64(address, data, 0)
elif isinstance(address, devices.XBee16BitAddress):
return device.send_data_async_16(address, data, 0)
raise ValueError(f'Unexpected address type {type(address)}')
# Zigbee
if isinstance(device, devices.ZigBeeDevice):
return device.send_data_async(remote, data, 0)
# XBee
if isinstance(device, devices.DigiMeshDevice):
return device.send_data_async(remote, data, 0)
raise TypeError(f'Unexpected device type {type(device)}')