-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.py
61 lines (48 loc) · 1.61 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import asyncio
import random
from types import TracebackType
from typing import Iterator, Type
from meshtastic.mesh_pb2 import ToRadio
from meshtastic.mesh_pb2 import FromRadio
from meshtastic.portnums_pb2 import PortNum
from meshtastic.telemetry_pb2 import Telemetry
class Message:
def __init__(self, from_radio):
self.from_radio = from_radio
if self.from_radio.HasField(""):
pass
class Client:
def __init__(self, connection):
self._connecion = connection
self._stop = False
def __repr__(self):
return f'Client<{self._connecion!r}>'
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(
self,
exc_type: Type[BaseException],
exc_val: BaseException,
exc_tb: TracebackType,
) -> None:
await self.disconnect()
async def connect(self):
await self._connecion.connect()
async def disconnect(self):
await self._connecion.disconnect()
async def read(self) -> Iterator[FromRadio]:
while not self._stop:
async for msg in self._connecion.read():
yield FromRadio.FromString(msg)
async def write(self, proto: ToRadio):
await self._connecion.write(proto.SerializeToString())
async def get_config(self) -> list[FromRadio]:
nonce = random.randint(0, 2**32)
config = []
await self.write(ToRadio(want_config_id=nonce))
async for proto in self.read():
if proto.config_complete_id == nonce:
break
config.append(proto)
return config