-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtx_message_handler.py
122 lines (103 loc) · 4.66 KB
/
tx_message_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import time
import re
import random
from meshtastic import portnums_pb2, mesh_pb2, mqtt_pb2, telemetry_pb2
from utils import generate_hash, get_message_id
from encryption import encrypt_packet
from load_config import ConfigLoader
from mqtt_handler import get_mqtt_client
message_id = random.getrandbits(32)
def publish_message(payload_function, portnum, **kwargs):
"""Send a message of any type."""
try:
config = ConfigLoader.get_config()
client = get_mqtt_client()
payload = payload_function(portnum=portnum, **kwargs)
topic = f"{config.mqtt.root_topic}{config.channel.preset}/{config.node.id}"
client.publish(topic, payload)
except Exception as e:
print(f"Error while sending message: {e}")
def create_payload(data, portnum, want_response=False, bitfield=1):
"""Generalized function to create a payload."""
encoded_message = mesh_pb2.Data()
encoded_message.portnum = portnum
encoded_message.payload = data.SerializeToString() if hasattr(data, "SerializeToString") else data
encoded_message.want_response = want_response
encoded_message.bitfield = bitfield
return generate_mesh_packet(encoded_message)
def generate_mesh_packet(encoded_message):
"""Generate the final mesh packet."""
config = ConfigLoader.get_config()
global message_id
message_id = get_message_id(message_id)
mesh_packet = mesh_pb2.MeshPacket()
mesh_packet.id = message_id
setattr(mesh_packet, "from", config.node.number)
mesh_packet.to = config.destination_id
mesh_packet.want_ack = False
mesh_packet.channel = generate_hash(config.channel.preset, config.channel.key)
mesh_packet.hop_limit = 3
mesh_packet.hop_start = 3
if config.channel.key == "":
mesh_packet.decoded.CopyFrom(encoded_message)
else:
mesh_packet.encrypted = encrypt_packet(config.channel.preset, config.channel.key, mesh_packet, encoded_message)
service_envelope = mqtt_pb2.ServiceEnvelope()
service_envelope.packet.CopyFrom(mesh_packet)
service_envelope.channel_id = config.channel.preset
service_envelope.gateway_id = config.node.id
return service_envelope.SerializeToString()
########## Specific Message Handlers ##########
def send_text_message(message):
"""Send a text message."""
def create_text_payload(portnum, message_text):
data = message_text.encode("utf-8")
return create_payload(data, portnum)
publish_message(create_text_payload, portnums_pb2.TEXT_MESSAGE_APP, message_text=message)
def send_nodeinfo(long_name, short_name, hw_model):
"""Send node information."""
def create_nodeinfo_payload(portnum, node_long_name, node_short_name, node_hw_model):
config = ConfigLoader.get_config()
data = mesh_pb2.User(
id=config.node.id,
long_name=node_long_name,
short_name=node_short_name,
hw_model=node_hw_model
)
return create_payload(data, portnum)
publish_message(create_nodeinfo_payload, portnums_pb2.NODEINFO_APP,
node_long_name=long_name, node_short_name=short_name, node_hw_model=hw_model)
def send_position(lat, lon, alt, pre):
"""Send position details."""
def create_position_payload(portnum, lat, lon, alt, pre):
pos_time = int(time.time())
latitude = int(float(lat) * 1e7)
longitude = int(float(lon) * 1e7)
altitude_units = 1 / 3.28084 if 'ft' in str(alt) else 1.0
altitude = int(altitude_units * float(re.sub('[^0-9.]', '', str(alt))))
data = mesh_pb2.Position(
latitude_i=latitude,
longitude_i=longitude,
altitude=altitude,
time=pos_time,
location_source="LOC_MANUAL",
precision_bits=pre
)
return create_payload(data, portnum)
publish_message(create_position_payload, portnums_pb2.POSITION_APP, lat=lat, lon=lon, alt=alt, pre=pre)
def send_device_telemetry(battery_level, voltage, chutil, airtxutil, uptime):
"""Send telemetry data."""
def create_telemetry_payload(portnum, battery_level, voltage, chutil, airtxutil, uptime):
data = telemetry_pb2.Telemetry(
time=int(time.time()),
device_metrics=telemetry_pb2.DeviceMetrics(
battery_level=battery_level,
voltage=voltage,
channel_utilization=chutil,
air_util_tx=airtxutil,
uptime_seconds=uptime
)
)
return create_payload(data, portnum)
publish_message(create_telemetry_payload, portnums_pb2.TELEMETRY_APP,
battery_level=battery_level, voltage=voltage, chutil=chutil, airtxutil=airtxutil, uptime=uptime)