Skip to content

Commit

Permalink
improve bluetooth/wifi on/off settings
Browse files Browse the repository at this point in the history
do not make it RASPI only
mv detect_network + no need for global for Ip address
bluetooth tethering not only on Raspi
  • Loading branch information
Ptosiek committed Oct 5, 2023
1 parent 0e53d0b commit e1236cb
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 170 deletions.
307 changes: 204 additions & 103 deletions modules/config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
import datetime
import argparse
import asyncio
import datetime
import json
import logging
import socket
import asyncio
import os
import re
import shutil
import traceback
import math
Expand All @@ -26,10 +26,18 @@
from logger import CustomRotatingFileHandler, app_logger
from modules.helper.setting import Setting
from modules.button_config import Button_Config
from modules.utils.cmd import exec_cmd, exec_cmd_return_value, is_running_as_service
from modules.utils.cmd import (
exec_cmd,
exec_cmd_return_value,
is_running_as_service,
is_service_running,
)
from modules.utils.timer import Timer


BOOT_FILE = "/boot/config.txt"


class Config:
#######################
# configurable values #
Expand Down Expand Up @@ -331,9 +339,6 @@ class Config:
# for read load average in sensor_core
G_PID = os.getpid()

# IP ADDRESS
G_IP_ADDRESS = ""

# stopwatch state
G_MANUAL_STATUS = "INIT"
G_STOPWATCH_STATUS = "INIT" # with Auto Pause
Expand Down Expand Up @@ -631,7 +636,7 @@ class Config:
G_IMU_MAG_DECLINATION = 0.0

# Bluetooth tethering
G_BT_ADDRESS = {}
G_BT_ADDRESSES = {}
G_BT_USE_ADDRESS = ""

# for track
Expand Down Expand Up @@ -803,31 +808,35 @@ async def delay_init(self):
self.network = Network(self)

# bluetooth
if self.G_IS_RASPI:
await self.gui.set_boot_status("initialize bluetooth modules...")
await self.gui.set_boot_status("initialize bluetooth modules...")

from modules.helper.bt_pan import (
BTPanDbus,
BTPanDbusNext,
HAS_DBUS_NEXT,
HAS_DBUS,
)
from modules.helper.bt_pan import (
BTPanDbus,
BTPanDbusNext,
HAS_DBUS_NEXT,
HAS_DBUS,
)

if HAS_DBUS_NEXT:
self.bt_pan = BTPanDbusNext()
elif HAS_DBUS:
self.bt_pan = BTPanDbus()
if HAS_DBUS_NEXT or HAS_DBUS:
await self.bt_pan.check_dbus()
if self.bt_pan.is_available:
self.G_BT_ADDRESS = await self.bt_pan.find_bt_pan_devices()
if HAS_DBUS_NEXT:
self.bt_pan = BTPanDbusNext()
elif HAS_DBUS:
self.bt_pan = BTPanDbus()
if HAS_DBUS_NEXT or HAS_DBUS:
is_available = await self.bt_pan.check_dbus()
if is_available:
self.G_BT_ADDRESSES = await self.bt_pan.find_bt_pan_devices()

# make sure the bluebooth status is set accordingly to AutoEnable
self.onoff_bt_on_init()

# gadgetbridge
if self.G_IS_RASPI:
try:
from modules.helper.ble_gatt_server import GadgetbridgeService

self.ble_uart = GadgetbridgeService(self)
except:
pass
except Exception as e: # noqa
app_logger.info(f"Gadgetbridge service not initialized: {e}")

# logger, sensor
await self.gui.set_boot_status("initialize sensor...")
Expand All @@ -840,26 +849,18 @@ async def delay_init(self):
if self.G_HEADLESS:
asyncio.create_task(self.keyboard_check())

# resume BT and thingsboard setting
if self.G_IS_RASPI:
self.G_BT_USE_ADDRESS = self.setting.get_config_pickle(
"G_BT_USE_ADDRESS", self.G_BT_USE_ADDRESS
)
self.G_THINGSBOARD_API["STATUS"] = self.setting.get_config_pickle(
"G_THINGSBOARD_API_STATUS", self.G_THINGSBOARD_API["STATUS"]
)
self.G_THINGSBOARD_API[
"AUTO_UPLOAD_VIA_BT"
] = self.setting.get_config_pickle(
"AUTO_UPLOAD_VIA_BT", self.G_THINGSBOARD_API["AUTO_UPLOAD_VIA_BT"]
)

# resume BT tethering
if (
self.G_BT_USE_ADDRESS != ""
and not self.G_THINGSBOARD_API["AUTO_UPLOAD_VIA_BT"]
):
await self.bluetooth_tethering()
self.G_BT_USE_ADDRESS = self.setting.get_config_pickle(
"G_BT_USE_ADDRESS", self.G_BT_USE_ADDRESS
)
self.G_THINGSBOARD_API["STATUS"] = self.setting.get_config_pickle(
"G_THINGSBOARD_API_STATUS", self.G_THINGSBOARD_API["STATUS"]
)
self.G_THINGSBOARD_API["AUTO_UPLOAD_VIA_BT"] = self.setting.get_config_pickle(
"AUTO_UPLOAD_VIA_BT", self.G_THINGSBOARD_API["AUTO_UPLOAD_VIA_BT"]
)
# resume BT tethering
if self.G_BT_USE_ADDRESS and not self.G_THINGSBOARD_API["AUTO_UPLOAD_VIA_BT"]:
await self.bluetooth_tethering()

delta = t.stop()
self.boot_time += delta
Expand Down Expand Up @@ -1013,14 +1014,6 @@ def reboot(self):
if self.G_IS_RASPI:
exec_cmd(["sudo", "reboot"])

def hardware_wifi_bt_on(self):
if self.G_IS_RASPI:
exec_cmd(["scripts/comment_out.sh"])

def hardware_wifi_bt_off(self):
if self.G_IS_RASPI:
exec_cmd(["scripts/uncomment.sh"])

def update_application(self):
if self.G_IS_RASPI:
exec_cmd(["git", "pull", "origin", "master"])
Expand All @@ -1030,69 +1023,177 @@ def restart_application(self):
if self.G_IS_RASPI:
exec_cmd(["sudo", "systemctl", "restart", "pizero_bikecomputer"])

def detect_network(self):
try:
socket.setdefaulttimeout(3)
connect_interface = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connect_interface.connect(("8.8.8.8", 53))
self.G_IP_ADDRESS = connect_interface.getsockname()[0]
return True
except socket.error:
self.G_IP_ADDRESS = "No address"
return False

def get_wifi_bt_status(self):
if not self.G_IS_RASPI:
return False, False
def hardware_wifi_bt(self, status):
app_logger.info(f"Hardware Wifi/BT: {status}")
if self.G_IS_RASPI:
with open(BOOT_FILE, "r") as f:
data = f.read()
for dev in ["wifi", "bt"]:
disable = f"dtoverlay=disable-{dev}"
if status:
if disable in data and f"#{disable}" not in data:
# comment it
exec_cmd(
[
"sudo",
"sed",
"-i",
f"s/^dtoverlay\=disable\-{dev}/\#dtoverlay\=disable\-{dev}/",
BOOT_FILE,
],
False,
)
# else nothing to do it's not disabled then (not present or commented)
else:
if f"#{disable}" in data:
# uncomment it, so it's disabled
exec_cmd(
[
"sudo",
"sed",
"-i",
f"s/^\#dtoverlay\=disable\-{dev}/dtoverlay\=disable\-{dev}/",
BOOT_FILE,
],
False,
)
elif disable in data:
# do nothing it's already the proper state...
pass
else:
exec_cmd(
["sudo", "sed", "-i", f"$a{disable}", BOOT_FILE], False
)

def get_bluetooth_status(self):
# make sure bluetooth service is running (+ hciuart ??)
# hciuart_running = is_service_running("hciuart")
bluetooth_running = is_service_running("bluetooth")
status = False
if bluetooth_running:
# also make sure bluetooth is not powered off (through bluetoothctl)
raw_status = exec_cmd_return_value(["bluetoothctl", "show"], False)
try:
status = [
x.strip()
for x in raw_status.splitlines()
if x.strip().startswith("Powered:")
][0]
m = re.search(r"Powered: (?P<value>yes|no)", status)
status = True if m.group("value") == "yes" else False
except IndexError:
app_logger.warning(
"Could not get bluetooth power status, assume it's off then"
)
return bluetooth_running and status # and hciuart

status = {"wlan": False, "bluetooth": False}
def get_wifi_status(self):
status = False
# that won't work if we have multiple Wi-Fi devices ?
try:
# json option requires raspbian buster
raw_status = exec_cmd_return_value(["rfkill", "--json"], cmd_print=False)
raw_status = exec_cmd_return_value(
["sudo", "rfkill", "--json"], cmd_print=False
)
json_status = json.loads(raw_status)
for l in json_status[""]:
if "type" not in l or l["type"] not in ["wlan", "bluetooth"]:
for device in json_status["rfkilldevices"]:
if device.get("type") not in ["wlan"]:
continue
if l["soft"] == "unblocked":
status[l["type"]] = True
except:
pass
return status["wlan"], status["bluetooth"]
if device["soft"] == "unblocked" and device["hard"] == "unblocked":
status = True
break
except Exception as e:
app_logger.warning(f"Exception occurred trying to get wifi status: {e}")
return status

def onoff_bt_on_init(self):
# read /etc/bluetooth/main.conf and make sure the current bluetooth status is set accordingly to AutoEnable
# we do this only if we run as a service
if is_running_as_service():
status = True # default to true (since bluez 5.65..)
with open("/etc/bluetooth/main.conf", "r") as f:
try:
data = [
x.strip() for x in f.read().splitlines() if "AutoEnable=" in x
][0]
# ignore if commented or missed. then it's default
if not data.startswith("#"):
m = re.search(r"^AutoEnable=(?P<value>true|false)", data)
status = True if m.group("value") == "true" else False
except IndexError as e:
pass
current_status = self.get_bluetooth_status()
if status != current_status:
app_logger.warning(
f"Incorrect status found: {current_status}. We wanted: {status}"
)
self.onoff_bt(status)

def onoff_bt(self, new_status):
# We will just use power off/on, on the adapter using bluetoothctl
# but to do this bluetooth service has to run, so check that first
# If the service is stopped/was disabled, this will re-enable and restart it.
# One other way would be to disable/enable the bluetooth service (but then it's fairly slow to start on demand
# OR having dtoverlay=disable-bt in /boot/config.txt (not sure how to re-enable the service then without reboot)
# The tradeoff is most likely a slower boot since bluetooth is always started

# unfortunately the setting AutoEnable does not seem to be respected on reboot
# (might depend on other packages installed)
# cf. https://discussion.fedoraproject.org/t/bluetooth-always-on-on-boot/65545/10
# so, we have the application check on start up the value of AutoEnable and switch it accordingly
auto_enable_status = f"s/AutoEnable={str(not new_status).lower()}/AutoEnable={str(new_status).lower()}/"

if not is_service_running("bluetooth"):
if new_status:
# make sure we have autoEnable on and reenable/start the service
exec_cmd(
[
"sudo",
"sed",
"-i",
auto_enable_status,
"/etc/bluetooth/main.conf",
],
False,
)
exec_cmd(["sudo", "systemctl", "enable", "bluetooth"], False)
exec_cmd(["sudo", "systemctl", "start", "bluetooth"], False)
else:
# Nothing to do, it's off already
pass
else:
# edit /etc/bluetooth/main.conf AutoEnable, so it keeps state after reboot
exec_cmd(
[
"sudo",
"sed",
"-i",
auto_enable_status,
"/etc/bluetooth/main.conf",
],
False,
)
exec_cmd(["bluetoothctl", "power", "on" if new_status else "off"], False)

def onoff_wifi_bt(self, key=None):
# in the future, manage with pycomman
if not self.G_IS_RASPI:
return
def onoff_wifi(self, new_status):
return_code = exec_cmd(
["rfkill", "unblock" if new_status else "block", "wifi"], False
)

onoff_cmd = {
"Wifi": {
True: ["rfkill", "block", "wifi"],
False: ["rfkill", "unblock", "wifi"],
},
"Bluetooth": {
True: ["rfkill", "block", "bluetooth"],
False: ["rfkill", "unblock", "bluetooth"],
},
}
status = {}
status["Wifi"], status["Bluetooth"] = self.get_wifi_bt_status()
exec_cmd(onoff_cmd[key][status[key]])
if return_code:
# TODO custom exception
raise OSError("Rfkill command failed")

async def bluetooth_tethering(self, disconnect=False):
if not self.G_IS_RASPI:
return
if self.G_BT_USE_ADDRESS == "":
return
if self.bt_pan is None:
if not self.G_BT_USE_ADDRESS or not self.bt_pan:
return

if not disconnect:
res = await self.bt_pan.connect_tethering(
self.G_BT_ADDRESS[self.G_BT_USE_ADDRESS]
self.G_BT_ADDRESSES[self.G_BT_USE_ADDRESS]
)
else:
res = await self.bt_pan.disconnect_tethering(
self.G_BT_ADDRESS[self.G_BT_USE_ADDRESS]
self.G_BT_ADDRESSES[self.G_BT_USE_ADDRESS]
)
return bool(res)

Expand Down
Loading

0 comments on commit e1236cb

Please sign in to comment.