Skip to content

Commit

Permalink
Merge pull request #43 from Ptosiek/refactor-utils
Browse files Browse the repository at this point in the history
Refactor utils
  • Loading branch information
hishizuka authored Oct 11, 2023
2 parents d6f0e2d + b4d92a0 commit a994fb8
Show file tree
Hide file tree
Showing 24 changed files with 1,067 additions and 1,035 deletions.
247 changes: 56 additions & 191 deletions modules/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,13 @@
import logging
import os
import shutil
import traceback
import math
from glob import glob

import numpy as np
import oyaml as yaml
from PIL import Image


_IS_RASPI = False
try:
import RPi.GPIO as GPIO

GPIO.setmode(GPIO.BCM)
_IS_RASPI = True
except ImportError:
pass

from logger import CustomRotatingFileHandler, app_logger
from modules.helper.setting import Setting
from modules.button_config import Button_Config
Expand All @@ -31,10 +20,19 @@
exec_cmd_return_value,
is_running_as_service,
)
from modules.utils.map import get_maptile_filename, get_tilexy_and_xy_in_tile
from modules.utils.timer import Timer


BOOT_FILE = "/boot/config.txt"
_IS_RASPI = False
try:
import RPi.GPIO as GPIO

GPIO.setmode(GPIO.BCM)
_IS_RASPI = True
except ImportError:
pass


class Config:
Expand Down Expand Up @@ -638,33 +636,13 @@ class Config:
G_BT_ADDRESSES = {}
G_BT_USE_ADDRESS = ""

# for track
TRACK_STR = [
"N",
"NE",
"E",
"SE",
"S",
"SW",
"W",
"NW",
"N",
]

# for get_dist_on_earth
GEO_R1 = 6378.137
GEO_R2 = 6356.752314140
GEO_R1_2 = (GEO_R1 * 1000) ** 2
GEO_R2_2 = (GEO_R2 * 1000) ** 2
GEO_E2 = (GEO_R1_2 - GEO_R2_2) / GEO_R1_2
G_DISTANCE_BY_LAT1S = GEO_R2 * 1000 * 2 * np.pi / 360 / 60 / 60 # [m]

#######################
# class objects #
#######################
logger = None
display = None
network = None
api = None
bt_pan = None
ble_uart = None
setting = None
Expand Down Expand Up @@ -694,9 +672,8 @@ def __init__(self):
self.G_FULLSCREEN = True
if args.demo:
self.G_DUMMY_OUTPUT = True
if args.layout:
if os.path.exists(args.layout):
self.G_LAYOUT_FILE = args.layout
if args.layout and os.path.exists(args.layout):
self.G_LAYOUT_FILE = args.layout
if args.headless:
self.G_HEADLESS = True
# show options
Expand Down Expand Up @@ -728,6 +705,7 @@ def __init__(self):
# map list
if os.path.exists(self.G_MAP_LIST):
self.read_map_list()

# set default values
for map_config in [
self.G_MAP_CONFIG,
Expand Down Expand Up @@ -802,8 +780,10 @@ async def delay_init(self):

# network
await self.gui.set_boot_status("initialize network modules...")
from modules.helper.api import api
from modules.helper.network import Network

self.api = api(self)
self.network = Network(self)

# bluetooth
Expand Down Expand Up @@ -870,38 +850,41 @@ async def delay_init(self):
await self.logger.resume_start_stop()

async def keyboard_check(self):
while not self.G_QUIT:
app_logger.info(
"s:start/stop, l: lap, r:reset, p: previous screen, n: next screen, q: quit"
)
key = await self.loop.run_in_executor(None, input, "> ")

if key == "s":
self.logger.start_and_stop_manual()
elif key == "l":
self.logger.count_laps()
elif key == "r":
self.logger.reset_count()
elif key == "n" and self.gui:
self.gui.scroll_next()
elif key == "p" and self.gui:
self.gui.scroll_prev()
elif key == "q" and self.gui:
await self.quit()
##### temporary #####
# test hardware key signals
elif key == "m" and self.gui:
self.gui.enter_menu()
elif key == "v" and self.gui:
self.gui.press_space()
elif key == "," and self.gui:
self.gui.press_tab()
elif key == "." and self.gui:
self.gui.press_shift_tab()
elif key == "b" and self.gui:
self.gui.back_menu()
elif key == "c" and self.gui:
self.gui.get_screenshot()
try:
while True:
app_logger.info(
"s:start/stop, l: lap, r:reset, p: previous screen, n: next screen, q: quit"
)
key = await self.loop.run_in_executor(None, input, "> ")

if key == "s":
self.logger.start_and_stop_manual()
elif key == "l":
self.logger.count_laps()
elif key == "r":
self.logger.reset_count()
elif key == "n" and self.gui:
self.gui.scroll_next()
elif key == "p" and self.gui:
self.gui.scroll_prev()
elif key == "q" and self.gui:
await self.quit()
##### temporary #####
# test hardware key signals
elif key == "m" and self.gui:
self.gui.enter_menu()
elif key == "v" and self.gui:
self.gui.press_space()
elif key == "," and self.gui:
self.gui.press_tab()
elif key == "." and self.gui:
self.gui.press_shift_tab()
elif key == "b" and self.gui:
self.gui.back_menu()
elif key == "c" and self.gui:
self.gui.get_screenshot()
except asyncio.CancelledError:
pass

def set_logger(self, logger):
self.logger = logger
Expand All @@ -919,15 +902,6 @@ def check_map_dir(self):
if self.G_LOG_ALTITUDE_FROM_DATA_SOURCE:
os.makedirs(os.path.join("maptile", self.G_DEM_MAP), exist_ok=True)

@staticmethod
def remove_maptiles(map_name):
path = os.path.join("maptile", map_name)
if os.path.exists(path):
files = os.listdir(path)
dirs = [f for f in files if os.path.isdir(os.path.join(path, f))]
for d in dirs:
shutil.rmtree(os.path.join(path, d))

def get_serial(self):
if not self.G_IS_RASPI:
return
Expand Down Expand Up @@ -996,9 +970,6 @@ async def quit(self):
self.logger.remove_handler()
app_logger.info("quit done")

if self.G_GUI_MODE != "PyQt":
self.loop.close()

def poweroff(self):
# TODO
# should be replaced by quit() with power_off option
Expand Down Expand Up @@ -1040,7 +1011,7 @@ def hardware_wifi_bt(self, status):
"sudo",
"sed",
"-i",
f"s/^dtoverlay\=disable\-{dev}/\#dtoverlay\=disable\-{dev}/",
rf"s/^dtoverlay\=disable\-{dev}/\#dtoverlay\=disable\-{dev}/",
BOOT_FILE,
],
False,
Expand All @@ -1054,7 +1025,7 @@ def hardware_wifi_bt(self, status):
"sudo",
"sed",
"-i",
f"s/^\#dtoverlay\=disable\-{dev}/dtoverlay\=disable\-{dev}/",
rf"s/^\#dtoverlay\=disable\-{dev}/dtoverlay\=disable\-{dev}/",
BOOT_FILE,
],
False,
Expand All @@ -1075,7 +1046,7 @@ def hardware_wifi_bt(self, status):
"sed",
"-i",
"-e",
's/^\#DEVICES\="\/dev\/ttyS0"/DEVICES\="\/dev\/ttyS0"/',
r's/^\#DEVICES\="\/dev\/ttyS0"/DEVICES\="\/dev\/ttyS0"/',
"/etc/default/gpsd",
],
False,
Expand All @@ -1086,7 +1057,7 @@ def hardware_wifi_bt(self, status):
"sed",
"-i",
"-e",
's/^DEVICES\="\/dev\/ttyAMA0"/\#DEVICES\="\/dev\/ttyAMA0"/',
r's/^DEVICES\="\/dev\/ttyAMA0"/\#DEVICES\="\/dev\/ttyAMA0"/',
"/etc/default/gpsd",
],
False,
Expand Down Expand Up @@ -1195,94 +1166,12 @@ def read_map_list(self):
map_list[key]["attribution"] = ""
self.G_MAP_CONFIG.update(map_list)

def get_track_str(self, drc):
track_int = int((drc + 22.5) / 45.0)
return self.TRACK_STR[track_int]

# return [m]
def get_dist_on_earth(self, p0_lon, p0_lat, p1_lon, p1_lat):
if p0_lon == p1_lon and p0_lat == p1_lat:
return 0
(r0_lon, r0_lat, r1_lon, r1_lat) = map(
math.radians, [p0_lon, p0_lat, p1_lon, p1_lat]
)
delta_x = r1_lon - r0_lon
cos_d = math.sin(r0_lat) * math.sin(r1_lat) + math.cos(r0_lat) * math.cos(
r1_lat
) * math.cos(delta_x)
try:
res = 1000 * math.acos(cos_d) * self.GEO_R1
return res
except:
# traceback.print_exc()
# print("cos_d =", cos_d)
# print("parameter:", p0_lon, p0_lat, p1_lon, p1_lat)
return 0

# return [m]
def get_dist_on_earth_array(self, p0_lon, p0_lat, p1_lon, p1_lat):
# if p0_lon == p1_lon and p0_lat == p1_lat:
# return 0
r0_lon = np.radians(p0_lon)
r0_lat = np.radians(p0_lat)
r1_lon = np.radians(p1_lon)
r1_lat = np.radians(p1_lat)
# (r0_lon, r0_lat, r1_lon, r1_lat) = map(radians, [p0_lon, p0_lat, p1_lon, p1_lat])
delta_x = r1_lon - r0_lon
cos_d = np.sin(r0_lat) * np.sin(r1_lat) + np.cos(r0_lat) * np.cos(
r1_lat
) * np.cos(delta_x)
try:
res = 1000 * np.arccos(cos_d) * self.GEO_R1
return res
except:
traceback.print_exc()
# #print("cos_d =", cos_d)
# #print("parameter:", p0_lon, p0_lat, p1_lon, p1_lat)
return np.array([])

# return [m]
def get_dist_on_earth_hubeny(self, p0_lon, p0_lat, p1_lon, p1_lat):
if p0_lon == p1_lon and p0_lat == p1_lat:
return 0
(r0_lon, r0_lat, r1_lon, r1_lat) = map(
math.radians, [p0_lon, p0_lat, p1_lon, p1_lat]
)
lat_t = (r0_lat + r1_lat) / 2
w = 1 - self.GEO_E2 * math.sin(lat_t) ** 2
c2 = math.cos(lat_t) ** 2
return math.sqrt(
(self.GEO_R2_2 / w**3) * (r0_lat - r1_lat) ** 2
+ (self.GEO_R1_2 / w) * c2 * (r0_lon - r1_lon) ** 2
)

@staticmethod
def calc_azimuth(lat, lon):
rad_latitude = np.radians(lat)
rad_longitude = np.radians(lon)
rad_longitude_delta = rad_longitude[1:] - rad_longitude[0:-1]
azimuth = np.mod(
np.degrees(
np.arctan2(
np.sin(rad_longitude_delta),
np.cos(rad_latitude[0:-1]) * np.tan(rad_latitude[1:])
- np.sin(rad_latitude[0:-1]) * np.cos(rad_longitude_delta),
)
),
360,
).astype(dtype="int16")
return azimuth

@staticmethod
def get_maptile_filename(map_name, z, x, y):
return "maptile/" + map_name + "/{0}/{1}/{2}.png".format(z, x, y)

async def get_altitude_from_tile(self, pos):
if np.isnan(pos[0]) or np.isnan(pos[1]):
return np.nan
z = self.G_DEM_MAP_CONFIG[self.G_DEM_MAP]["fix_zoomlevel"]
f_x, f_y, p_x, p_y = self.get_tilexy_and_xy_in_tile(z, pos[0], pos[1], 256)
filename = self.get_maptile_filename(self.G_DEM_MAP, z, f_x, f_y)
f_x, f_y, p_x, p_y = get_tilexy_and_xy_in_tile(z, pos[0], pos[1], 256)
filename = get_maptile_filename(self.G_DEM_MAP, z, f_x, f_y)

if not os.path.exists(filename):
await self.network.download_demtile(z, f_x, f_y)
Expand All @@ -1305,30 +1194,6 @@ async def get_altitude_from_tile(self, pos):
# print(altitude, filename, p_x, p_y, pos[1], pos[0])
return altitude

@staticmethod
def get_tilexy_and_xy_in_tile(z, x, y, tile_size):
n = 2.0**z
_y = math.radians(y)
x_in_tile, tile_x = math.modf((x + 180.0) / 360.0 * n)
y_in_tile, tile_y = math.modf(
(1.0 - math.log(math.tan(_y) + (1.0 / math.cos(_y))) / math.pi) / 2.0 * n
)

return (
int(tile_x),
int(tile_y),
int(x_in_tile * tile_size),
int(y_in_tile * tile_size),
)

@staticmethod
def get_lon_lat_from_tile_xy(z, x, y):
n = 2.0**z
lon = x / n * 360.0 - 180.0
lat = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * y / n))))

return lon, lat

def get_courses(self):
dirs = sorted(
glob(os.path.join(self.G_COURSE_DIR, "*.tcx")),
Expand Down
Loading

0 comments on commit a994fb8

Please sign in to comment.