Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor utils #43

Merged
merged 5 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 56 additions & 191 deletions modules/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,13 @@
import logging
import os
import shutil
import traceback
import math
from glob import glob

import numpy as np
import oyaml as yaml
from PIL import Image


_IS_RASPI = False
try:
import RPi.GPIO as GPIO

GPIO.setmode(GPIO.BCM)
_IS_RASPI = True
except ImportError:
pass

from logger import CustomRotatingFileHandler, app_logger
from modules.helper.setting import Setting
from modules.button_config import Button_Config
Expand All @@ -31,10 +20,19 @@
exec_cmd_return_value,
is_running_as_service,
)
from modules.utils.map import get_maptile_filename, get_tilexy_and_xy_in_tile
from modules.utils.timer import Timer


BOOT_FILE = "/boot/config.txt"
_IS_RASPI = False
try:
import RPi.GPIO as GPIO

GPIO.setmode(GPIO.BCM)
_IS_RASPI = True
except ImportError:
pass


class Config:
Expand Down Expand Up @@ -638,33 +636,13 @@ class Config:
G_BT_ADDRESSES = {}
G_BT_USE_ADDRESS = ""

# for track
TRACK_STR = [
"N",
"NE",
"E",
"SE",
"S",
"SW",
"W",
"NW",
"N",
]

# for get_dist_on_earth
GEO_R1 = 6378.137
GEO_R2 = 6356.752314140
GEO_R1_2 = (GEO_R1 * 1000) ** 2
GEO_R2_2 = (GEO_R2 * 1000) ** 2
GEO_E2 = (GEO_R1_2 - GEO_R2_2) / GEO_R1_2
G_DISTANCE_BY_LAT1S = GEO_R2 * 1000 * 2 * np.pi / 360 / 60 / 60 # [m]

#######################
# class objects #
#######################
logger = None
display = None
network = None
api = None
bt_pan = None
ble_uart = None
setting = None
Expand Down Expand Up @@ -694,9 +672,8 @@ def __init__(self):
self.G_FULLSCREEN = True
if args.demo:
self.G_DUMMY_OUTPUT = True
if args.layout:
if os.path.exists(args.layout):
self.G_LAYOUT_FILE = args.layout
if args.layout and os.path.exists(args.layout):
self.G_LAYOUT_FILE = args.layout
if args.headless:
self.G_HEADLESS = True
# show options
Expand Down Expand Up @@ -728,6 +705,7 @@ def __init__(self):
# map list
if os.path.exists(self.G_MAP_LIST):
self.read_map_list()

# set default values
for map_config in [
self.G_MAP_CONFIG,
Expand Down Expand Up @@ -802,8 +780,10 @@ async def delay_init(self):

# network
await self.gui.set_boot_status("initialize network modules...")
from modules.helper.api import api
from modules.helper.network import Network

self.api = api(self)
self.network = Network(self)

# bluetooth
Expand Down Expand Up @@ -870,38 +850,41 @@ async def delay_init(self):
await self.logger.resume_start_stop()

async def keyboard_check(self):
while not self.G_QUIT:
app_logger.info(
"s:start/stop, l: lap, r:reset, p: previous screen, n: next screen, q: quit"
)
key = await self.loop.run_in_executor(None, input, "> ")

if key == "s":
self.logger.start_and_stop_manual()
elif key == "l":
self.logger.count_laps()
elif key == "r":
self.logger.reset_count()
elif key == "n" and self.gui:
self.gui.scroll_next()
elif key == "p" and self.gui:
self.gui.scroll_prev()
elif key == "q" and self.gui:
await self.quit()
##### temporary #####
# test hardware key signals
elif key == "m" and self.gui:
self.gui.enter_menu()
elif key == "v" and self.gui:
self.gui.press_space()
elif key == "," and self.gui:
self.gui.press_tab()
elif key == "." and self.gui:
self.gui.press_shift_tab()
elif key == "b" and self.gui:
self.gui.back_menu()
elif key == "c" and self.gui:
self.gui.get_screenshot()
try:
while True:
app_logger.info(
"s:start/stop, l: lap, r:reset, p: previous screen, n: next screen, q: quit"
)
key = await self.loop.run_in_executor(None, input, "> ")

if key == "s":
self.logger.start_and_stop_manual()
elif key == "l":
self.logger.count_laps()
elif key == "r":
self.logger.reset_count()
elif key == "n" and self.gui:
self.gui.scroll_next()
elif key == "p" and self.gui:
self.gui.scroll_prev()
elif key == "q" and self.gui:
await self.quit()
##### temporary #####
# test hardware key signals
elif key == "m" and self.gui:
self.gui.enter_menu()
elif key == "v" and self.gui:
self.gui.press_space()
elif key == "," and self.gui:
self.gui.press_tab()
elif key == "." and self.gui:
self.gui.press_shift_tab()
elif key == "b" and self.gui:
self.gui.back_menu()
elif key == "c" and self.gui:
self.gui.get_screenshot()
except asyncio.CancelledError:
pass

def set_logger(self, logger):
self.logger = logger
Expand All @@ -919,15 +902,6 @@ def check_map_dir(self):
if self.G_LOG_ALTITUDE_FROM_DATA_SOURCE:
os.makedirs(os.path.join("maptile", self.G_DEM_MAP), exist_ok=True)

@staticmethod
def remove_maptiles(map_name):
path = os.path.join("maptile", map_name)
if os.path.exists(path):
files = os.listdir(path)
dirs = [f for f in files if os.path.isdir(os.path.join(path, f))]
for d in dirs:
shutil.rmtree(os.path.join(path, d))

def get_serial(self):
if not self.G_IS_RASPI:
return
Expand Down Expand Up @@ -996,9 +970,6 @@ async def quit(self):
self.logger.remove_handler()
app_logger.info("quit done")

if self.G_GUI_MODE != "PyQt":
self.loop.close()

def poweroff(self):
# TODO
# should be replaced by quit() with power_off option
Expand Down Expand Up @@ -1040,7 +1011,7 @@ def hardware_wifi_bt(self, status):
"sudo",
"sed",
"-i",
f"s/^dtoverlay\=disable\-{dev}/\#dtoverlay\=disable\-{dev}/",
rf"s/^dtoverlay\=disable\-{dev}/\#dtoverlay\=disable\-{dev}/",
BOOT_FILE,
],
False,
Expand All @@ -1054,7 +1025,7 @@ def hardware_wifi_bt(self, status):
"sudo",
"sed",
"-i",
f"s/^\#dtoverlay\=disable\-{dev}/dtoverlay\=disable\-{dev}/",
rf"s/^\#dtoverlay\=disable\-{dev}/dtoverlay\=disable\-{dev}/",
BOOT_FILE,
],
False,
Expand All @@ -1075,7 +1046,7 @@ def hardware_wifi_bt(self, status):
"sed",
"-i",
"-e",
's/^\#DEVICES\="\/dev\/ttyS0"/DEVICES\="\/dev\/ttyS0"/',
r's/^\#DEVICES\="\/dev\/ttyS0"/DEVICES\="\/dev\/ttyS0"/',
"/etc/default/gpsd",
],
False,
Expand All @@ -1086,7 +1057,7 @@ def hardware_wifi_bt(self, status):
"sed",
"-i",
"-e",
's/^DEVICES\="\/dev\/ttyAMA0"/\#DEVICES\="\/dev\/ttyAMA0"/',
r's/^DEVICES\="\/dev\/ttyAMA0"/\#DEVICES\="\/dev\/ttyAMA0"/',
"/etc/default/gpsd",
],
False,
Expand Down Expand Up @@ -1195,94 +1166,12 @@ def read_map_list(self):
map_list[key]["attribution"] = ""
self.G_MAP_CONFIG.update(map_list)

def get_track_str(self, drc):
track_int = int((drc + 22.5) / 45.0)
return self.TRACK_STR[track_int]

# return [m]
def get_dist_on_earth(self, p0_lon, p0_lat, p1_lon, p1_lat):
if p0_lon == p1_lon and p0_lat == p1_lat:
return 0
(r0_lon, r0_lat, r1_lon, r1_lat) = map(
math.radians, [p0_lon, p0_lat, p1_lon, p1_lat]
)
delta_x = r1_lon - r0_lon
cos_d = math.sin(r0_lat) * math.sin(r1_lat) + math.cos(r0_lat) * math.cos(
r1_lat
) * math.cos(delta_x)
try:
res = 1000 * math.acos(cos_d) * self.GEO_R1
return res
except:
# traceback.print_exc()
# print("cos_d =", cos_d)
# print("parameter:", p0_lon, p0_lat, p1_lon, p1_lat)
return 0

# return [m]
def get_dist_on_earth_array(self, p0_lon, p0_lat, p1_lon, p1_lat):
# if p0_lon == p1_lon and p0_lat == p1_lat:
# return 0
r0_lon = np.radians(p0_lon)
r0_lat = np.radians(p0_lat)
r1_lon = np.radians(p1_lon)
r1_lat = np.radians(p1_lat)
# (r0_lon, r0_lat, r1_lon, r1_lat) = map(radians, [p0_lon, p0_lat, p1_lon, p1_lat])
delta_x = r1_lon - r0_lon
cos_d = np.sin(r0_lat) * np.sin(r1_lat) + np.cos(r0_lat) * np.cos(
r1_lat
) * np.cos(delta_x)
try:
res = 1000 * np.arccos(cos_d) * self.GEO_R1
return res
except:
traceback.print_exc()
# #print("cos_d =", cos_d)
# #print("parameter:", p0_lon, p0_lat, p1_lon, p1_lat)
return np.array([])

# return [m]
def get_dist_on_earth_hubeny(self, p0_lon, p0_lat, p1_lon, p1_lat):
if p0_lon == p1_lon and p0_lat == p1_lat:
return 0
(r0_lon, r0_lat, r1_lon, r1_lat) = map(
math.radians, [p0_lon, p0_lat, p1_lon, p1_lat]
)
lat_t = (r0_lat + r1_lat) / 2
w = 1 - self.GEO_E2 * math.sin(lat_t) ** 2
c2 = math.cos(lat_t) ** 2
return math.sqrt(
(self.GEO_R2_2 / w**3) * (r0_lat - r1_lat) ** 2
+ (self.GEO_R1_2 / w) * c2 * (r0_lon - r1_lon) ** 2
)

@staticmethod
def calc_azimuth(lat, lon):
rad_latitude = np.radians(lat)
rad_longitude = np.radians(lon)
rad_longitude_delta = rad_longitude[1:] - rad_longitude[0:-1]
azimuth = np.mod(
np.degrees(
np.arctan2(
np.sin(rad_longitude_delta),
np.cos(rad_latitude[0:-1]) * np.tan(rad_latitude[1:])
- np.sin(rad_latitude[0:-1]) * np.cos(rad_longitude_delta),
)
),
360,
).astype(dtype="int16")
return azimuth

@staticmethod
def get_maptile_filename(map_name, z, x, y):
return "maptile/" + map_name + "/{0}/{1}/{2}.png".format(z, x, y)

async def get_altitude_from_tile(self, pos):
if np.isnan(pos[0]) or np.isnan(pos[1]):
return np.nan
z = self.G_DEM_MAP_CONFIG[self.G_DEM_MAP]["fix_zoomlevel"]
f_x, f_y, p_x, p_y = self.get_tilexy_and_xy_in_tile(z, pos[0], pos[1], 256)
filename = self.get_maptile_filename(self.G_DEM_MAP, z, f_x, f_y)
f_x, f_y, p_x, p_y = get_tilexy_and_xy_in_tile(z, pos[0], pos[1], 256)
filename = get_maptile_filename(self.G_DEM_MAP, z, f_x, f_y)

if not os.path.exists(filename):
await self.network.download_demtile(z, f_x, f_y)
Expand All @@ -1305,30 +1194,6 @@ async def get_altitude_from_tile(self, pos):
# print(altitude, filename, p_x, p_y, pos[1], pos[0])
return altitude

@staticmethod
def get_tilexy_and_xy_in_tile(z, x, y, tile_size):
n = 2.0**z
_y = math.radians(y)
x_in_tile, tile_x = math.modf((x + 180.0) / 360.0 * n)
y_in_tile, tile_y = math.modf(
(1.0 - math.log(math.tan(_y) + (1.0 / math.cos(_y))) / math.pi) / 2.0 * n
)

return (
int(tile_x),
int(tile_y),
int(x_in_tile * tile_size),
int(y_in_tile * tile_size),
)

@staticmethod
def get_lon_lat_from_tile_xy(z, x, y):
n = 2.0**z
lon = x / n * 360.0 - 180.0
lat = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * y / n))))

return lon, lat

def get_courses(self):
dirs = sorted(
glob(os.path.join(self.G_COURSE_DIR, "*.tcx")),
Expand Down
Loading