Skip to content

Commit

Permalink
Add the bluetooth-autoconnect service
Browse files Browse the repository at this point in the history
  • Loading branch information
civerachb-cpr committed Oct 25, 2024
1 parent 975f2c7 commit 92957c3
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 0 deletions.
1 change: 1 addition & 0 deletions debian/postinst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ if [ -f /etc/turtlebot4/chrony.conf ]; then
fi
sudo service chrony restart
sudo systemctl enable webserver.service
sudo systemctl enable bluetooth-autoconnect.service
sudo systemctl disable systemd-networkd-wait-online.service

# Remove old/new dpkg configs
Expand Down
13 changes: 13 additions & 0 deletions etc/systemd/system/bluetooth-autoconnect.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# https://github.com/jrouleau/bluetooth-autoconnect
# MIT License

[Unit]
Description=Bluetooth autoconnect service
Before=bluetooth.service

[Service]
Type=simple
ExecStart=/usr/bin/bluetooth-autoconnect -d

[Install]
WantedBy=bluetooth.service
289 changes: 289 additions & 0 deletions scripts/bluetooth-autoconnect
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
#!/usr/bin/env python3
"""
SOURCE
https://github.com/jrouleau/bluetooth-autoconnect
MIT License
Copyright (c) 2019 Jonathan Rouleau
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

import getopt
import os
import signal
import sys

import dbus

from dbus.mainloop.glib import DBusGMainLoop
from functools import partial
from gi.repository import GLib
from xml.etree import ElementTree


SCRIPT_NAME = os.path.basename(sys.argv[0])


def dbus_get_child_object_paths(object_path):
object_paths = []
obj = bus.get_object('org.bluez', object_path, introspect=False)
xml_string = obj.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable')

if object_path == '/':
object_path = ''

for child in ElementTree.fromstring(xml_string):
if child.tag == 'node':
object_paths.append(object_path + '/' + child.attrib['name'])

return object_paths


def connect_devices_for_adapter(adapter_path):
if verbose:
print(f'scanning for devices on {adapter_path}', flush=True)

# Get list of devices
device_paths = dbus_get_child_object_paths(adapter_path)

# Manage pending connections for non-daemon mode
def add_pending_connection(device_path):
if not daemon:
pending_connections.add(device_path)

def remove_pending_connection(device_path):
if not daemon:
pending_connections.discard(device_path)
# If this was the last pending connection, terminate the
# mainloop
if len(pending_connections) == 0:
loop.quit()

# Handle the async replies for a connection attempt
def reply_handler(device_path):
print(f'successfully connected to device {device_path}', flush=True)
remove_pending_connection(device_path)

def error_handler(device_path, e):
print(f'error connecting to device {device_path}: {e.get_dbus_message()}', file=sys.stderr, flush=True)
remove_pending_connection(device_path)

for device_path in device_paths:
if verbose:
print(f'found device {device_path}', flush=True)
try:
# Read the device's properties
obj = bus.get_object('org.bluez', device_path)
props = obj.GetAll(
'org.bluez.Device1',
dbus_interface='org.freedesktop.DBus.Properties')

if props.get('Connected', False):
if verbose:
print(f'device {device_path} is already connected', flush=True)
elif not props.get('Trusted', False):
if verbose:
print(f'device {device_path} is not trusted', flush=True)
else:
print(f'connecting to device {device_path}', flush=True)
add_pending_connection(device_path)

# Attempt to connect to the device
obj.Connect(
dbus_interface='org.bluez.Device1',
reply_handler=partial(reply_handler, device_path),
error_handler=partial(error_handler, device_path))
except dbus.exceptions.DBusException as e:
error_handler(device_path, e)


def connect_devices_for_all_adapters():
if verbose:
print('scanning for adapters...', flush=True)

# Get a list of adapters
adapter_paths = dbus_get_child_object_paths('/org/bluez')

for adapter_path in adapter_paths:
if verbose:
print(f'found adapter {adapter_path}', flush=True)
try:
# Read the adapter's properties
obj = bus.get_object('org.bluez', adapter_path)
props = obj.GetAll(
'org.bluez.Adapter1',
dbus_interface='org.freedesktop.DBus.Properties')

# We're only interested in adapters that are powered on
if props.get('Powered', False):
if verbose:
print(f'adapter {adapter_path} is powered on', flush=True)

# Try to connect to devices on this adapter
connect_devices_for_adapter(adapter_path)
except dbus.exceptions.DBusException as e:
print(f'error reading properties for adapter {adapter_path}: {e.get_dbus_message()}', file=sys.stderr, flush=True)


def properties_changed_handler(interface_name, changed_properties, invalidated_properties, path):
# We're only interested in adapters that have been powered on
if interface_name == 'org.bluez.Adapter1' and changed_properties.get('Powered', False):
if verbose:
print(f'adapter {path} has powered on', flush=True)

# Try to connect to devices on this adapter
connect_devices_for_adapter(path)


def usage():
print('\n'.join([
f'Usage: {SCRIPT_NAME} [OPTIONS]...',
f'',
f'Automatically connect to trusted bluetooth devices',
f'',
f'OPTIONS:',
f' -d, --daemon Monitor bluetooth adapters and automatically connect to',
f' trusted devices when an adapter is powered on',
f' -h, --help Print this help message',
f' -v, --verbose Show more detailed log messages',
f'',
]), flush=True)
sys.exit(0)


def main():
global daemon
global verbose

# Parse command line arguments
try:
opts, cmds = getopt.getopt(sys.argv[1:], 'dhv', ['daemon', 'help', 'verbose'])
except getopt.GetoptError as e:
print(f'{SCRIPT_NAME}:', e, file=sys.stderr, flush=True)
print(f"Try '{SCRIPT_NAME} --help' for more information", file=sys.stderr, flush=True)
sys.exit(1)

# Process options (e.g. -h, --verbose)
for o, v in opts:
if o in ('-d', '--daemon'):
daemon = True
elif o in ('-h', '--help'):
usage()
elif o in ('-v', '--verbose'):
verbose = True
else:
# This shouldn't ever happen unless we forget to handle an
# option we've added
print(f'{SCRIPT_NAME}: internal error: unhandled option {o}', file=sys.stderr, flush=True)
sys.exit(1)

# Process commands
# This script does not use any commands so we will exit if one is
# incorrectly provided
if len(cmds) > 0:
print(f"{SCRIPT_NAME}: command '{c}' not recognized", file=sys.stderr, flush=True)
print(f"Try '{SCRIPT_NAME} --help' for more information", file=sys.stderr, flush=True)
sys.exit(1)

# Set process name and title
# This allows commands like `killall SCRIPT_NAME` to function
# correctly
try:
import prctl
if verbose:
print(f'setting process name to \'{SCRIPT_NAME}\'', flush=True)
prctl.set_name(SCRIPT_NAME)
prctl.set_proctitle(' '.join(sys.argv))
except ImportError:
if verbose:
print(f'failed to load module \'prctl\'', flush=True)
print(f'process name not set', flush=True)

if daemon:
# Listen for changes on the BlueZ dbus interface
# This is a catch all listener (no path specified) because we
# want to get notified for all adapters without keeping a list
# of them and managing signal handlers independantly
bus.add_signal_receiver(
properties_changed_handler,
signal_name='PropertiesChanged',
dbus_interface='org.freedesktop.DBus.Properties',
bus_name='org.bluez',
path=None,
path_keyword='path')

# Attempt to connect to devices on all existing adapters
connect_devices_for_all_adapters()

# Start the mainloop
loop.run()
else:
# Attempt to connect to devices on all existing adapters
connect_devices_for_all_adapters()

# If we're waiting for connection attemps to finish, start the
# mainloop. We will automatically exit the loop once everything
# is finished
if len(pending_connections) > 0:
loop.run()


def signal_handler(sig, frame):
if sig == signal.SIGHUP:
# Rescan adapters and attempt to connect to devices if we're in
# daemon mode
if daemon:
connect_devices_for_all_adapters()
elif sig in (signal.SIGINT, signal.SIGTERM):
# Gracefully exit
sys.exit(0)
else:
# This shouldn't ever happen unless we forget to handle a signal
# we've added
print(f'internal error: unhandled signal {sig}', file=sys.stderr, flush=True)
sys.exit(2)


if __name__ == '__main__':
# Register signal handlers
signal.signal(signal.SIGHUP, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGUSR1, signal.SIG_IGN)
signal.signal(signal.SIGUSR2, signal.SIG_IGN)
signal.signal(signal.SIGALRM, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal_handler)

# Connect to the system dbus
DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()

# Initialize globals
daemon = False
verbose = False
pending_connections = set()

# Initialize the mainloop, but don't start it yet
loop = GLib.MainLoop()

main()


# vim: ft=python ts=8 et sw=4 sts=4

0 comments on commit 92957c3

Please sign in to comment.