forked from btemperli/LoRaPy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlorasender.py
77 lines (62 loc) · 2.37 KB
/
lorasender.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from SX127x.LoRa import *
from SX127x.LoRaArgumentParser import LoRaArgumentParser
from SX127x.board_config_ada import BOARD
import LoRaPy.counter as counter
import LoRaWAN
from LoRaWAN.MHDR import MHDR
import LoRaPy.reset_ada as reset_ada
reset_ada.reset()
BOARD.setup()
parser = LoRaArgumentParser("LoRaWAN sender")
class LoRaSender(LoRa):
def __init__(self, devaddr=[], nwkey=[], appkey=[], verbose=False, callback=lambda *_, **__: None):
super(LoRaSender, self).__init__(verbose)
self.verbose = verbose
self.devaddr = devaddr
self.nwkey = nwkey
self.appkey = appkey
self.rx_callback = callback
def on_rx_done(self):
if self.verbose:
print("RxDone")
self.clear_irq_flags(RxDone=1)
payload = self.read_payload(nocheck=True)
# if self.verbose:
# print("".join(format(x, '02x') for x in bytes(payload)))
lorawan = LoRaWAN.new(self.nwkey, self.appkey)
lorawan.read(payload)
# call callback-function
self.rx_callback(lorawan.get_payload())
# if self.verbose:
# print("lorawan read payload internally")
# print(lorawan.get_mhdr().get_mversion())
# print(lorawan.get_mhdr().get_mtype())
# print(lorawan.get_mic())
# print(lorawan.compute_mic())
# print(lorawan.valid_mic())
# raw_payload = "".join(list(map(chr, lorawan.get_payload())))
# print(raw_payload)
# print("\n")
self.set_mode(MODE.SLEEP)
self.reset_ptr_rx()
self.set_mode(MODE.STDBY)
def on_tx_done(self):
self.set_mode(MODE.STDBY)
self.clear_irq_flags(TxDone=1)
if self.verbose:
print("TxDone")
self.set_mode(MODE.STDBY)
if self.verbose:
print("TxDone. Receiving LoRaWAN message\n")
# set to "RX"
self.set_dio_mapping([0] * 6)
self.set_invert_iq(1)
self.reset_ptr_rx()
self.set_mode(MODE.RXCONT)
print('check rx-state:')
print(self.rx_is_good())
def send_tx(self, message):
lorawan = LoRaWAN.new(self.nwkey, self.appkey)
lorawan.create(MHDR.UNCONF_DATA_UP, {'devaddr': self.devaddr, 'fcnt': counter.get_current(), 'data': list(map(ord, message))})
self.write_payload(lorawan.to_raw())
self.set_mode(MODE.TX)