-
Notifications
You must be signed in to change notification settings - Fork 0
/
schemas_ping2.py
155 lines (129 loc) · 4.3 KB
/
schemas_ping2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import math
import re
import warnings
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Sequence
import construct as c
import numpy as np
from numpy.ma import log
message_id_schema = c.Enum(
c.Int16ul,
NMEA0183=9,
PROFILE6=1308,
)
ping_schema = c.Struct(
start=c.RawCopy(c.Const(b'BR')),
payload_length=c.RawCopy(c.Int16ul),
message_id=c.RawCopy(message_id_schema),
src_device_id=c.RawCopy(c.Int8ul),
dest_device_id=c.RawCopy(c.Int8ul),
payload=c.RawCopy(c.Bytes(c.this.payload_length.value)),
checksum=c.Checksum(c.Int16ul, lambda b: sum(b) % (1 << 16),
lambda cxt: b''.join(
cxt[attr].data for attr in [
'start', 'payload_length', 'message_id', 'src_device_id',
'dest_device_id', 'payload'])
),
)
profile6_schema = c.Struct(
ping_number=c.Int32ul,
start_mm=c.Int32ul,
length_mm=c.Int32ul,
start_ping_hz=c.Int32ul,
end_ping_hz=c.Int32ul,
adc_sample_hz=c.Int32ul,
timestamp_msec=c.Int32ul,
spare2=c.Padding(4),
ping_duration_sec=c.Float32l,
analog_gain=c.Float32l,
max_pwr=c.Float32l,
min_pwr=c.Float32l,
step_db=c.Float32l,
padding_2=c.Padding(8),
is_db=c.Int8ul,
gain_index=c.Int8ul,
decimation=c.Int8ul,
padding_3=c.Padding(1),
num_results=c.Int16ul,
scaled_db_pwr_results=c.Array(c.this.num_results, c.Int16ul)
)
def get_ranges_root_power(p6):
assert math.isclose(p6.max_pwr - p6.min_pwr,
p6.step_db * ((1 << 16) - 1))
pwr_or_db = (
p6.min_pwr + np.array(p6.scaled_db_pwr_results) * p6.step_db
)
pwr = np.sqrt(pwr_or_db) if not p6.is_db else np.power(10, pwr_or_db / 20)
return pwr
def get_ranges_db(p6):
assert math.isclose(p6.max_pwr - p6.min_pwr,
p6.step_db * ((1 << 16) - 1))
pwr_or_db = (
p6.min_pwr + np.array(p6.scaled_db_pwr_results) * p6.step_db
)
db = pwr_or_db if p6.is_db else 10 * np.log(pwr_or_db, 10)
return db
@dataclass
class NMEAPacket:
talker_id: str
sentence_id: str
words: Sequence[str]
def parse_nmea_datetime(datestr, timestr):
assert timestr[6] == '.'
return datetime(
day=int(datestr[0:2]),
month=int(datestr[2:4]),
year=(int(datestr[4:6]) - 50) % 100 + 1950,
hour=int(timestr[0:2]),
minute=int(timestr[2:4]),
second=int(timestr[4:6]),
microsecond=int(timestr[7:9]) * 10000,
)
def parse_nmea_degrees(degrees_str):
degrees, minutes = re.match('(\d.*)(\d{2}\.\d+)',
degrees_str).groups()
return int(degrees) + float(minutes) / 60
def parse_latitude(degrees_str, ns_str):
if degrees_str == '':
return None
return parse_nmea_degrees(degrees_str) * {'N': +1, 'S': -1}[ns_str]
def parse_longitude(degrees_str, ew_str):
if degrees_str == '':
return None
return parse_nmea_degrees(degrees_str) * {'E': +1, 'W': -1}[ew_str]
@dataclass
class NMEASentenceRMC:
timestamp: datetime = None
is_status_ok: bool = False
latitude_n: float = 0.0
longitude_e: float = 0.0
speed_over_ground_knots: float = 0.0
track_made_good_degrees_true: float = 0.0
magnetic_variation_degrees_e: Optional[float] = None
def parse_nmea_rmc(words: Sequence[str]):
return NMEASentenceRMC(
timestamp=parse_nmea_datetime(datestr=words[8], timestr=words[0]),
is_status_ok={'A': True, 'V': False}[words[1]],
latitude_n=parse_latitude(words[2], words[3]),
longitude_e=parse_longitude(words[4], words[5]),
speed_over_ground_knots=float(words[6]),
track_made_good_degrees_true=(
None if words[7] == '' else float(words[7])),
magnetic_variation_degrees_e=parse_longitude(words[9], words[10]),
)
def parse_nmea(payload: bytes):
payload_str = payload.decode('ascii')
match = re.match(r'(\$?)([A-Z]{2})([A-Z]{3}),(.*)(\r?\n?)', payload_str)
start, talker_id, sentence_id, words, end = match.groups()
if start != '$':
warnings.warn(
fr'NMEA0183 string {payload_str!r} did not start with $')
if end != '\r\n':
warnings.warn(
fr'NMEA0183 string {payload_str!r} did not end with \r\n')
return NMEAPacket(
talker_id=talker_id,
sentence_id=sentence_id,
words=tuple(words.split(','))
)