From 26e70f7b3a28ce77bab6a91ff652abb84fc9e457 Mon Sep 17 00:00:00 2001 From: FahirahD Date: Tue, 18 Feb 2025 16:43:20 -0500 Subject: [PATCH 1/3] update the example to have a complete and simple usage flow --- examples/control_media_player.py | 230 ++++++++++++++++++++++++++----- 1 file changed, 195 insertions(+), 35 deletions(-) diff --git a/examples/control_media_player.py b/examples/control_media_player.py index bf94227..045d232 100644 --- a/examples/control_media_player.py +++ b/examples/control_media_player.py @@ -4,40 +4,200 @@ # e.g. the music player on your phone. # This script displays information about the current track. # Before you can run this scrip you have to pair and connect your audio -# source. For simplicity we can do this on the command line with the -# bluetoothctl tool -# pi@RPi3:~ $ bluetoothctl -# [bluetooth]# agent NoInputNoOutput -# Agent registered -# [bluetooth]# discoverable on -# Changing discoverable on succeeded -# [CHG] Controller B8:27:EB:22:57:E0 Discoverable: yes -# -# Now we have made the Raspberry Pi discoverable we can pair to it from the -# mobile phone. Once it has paired you can tell the Raspberry Pi that it is a -# trusted device -# -# [Nexus 5X]# trust 64:BC:0C:F6:22:F8 -# -# Now the phone is connected you can run this script to find which track is -# playing -# -# pi@RPi3:~ $ python3 examples/control_media_player.py +# source. +from bluezero.adapter import list_adapters, Adapter from bluezero import dbus_tools -from bluezero import media_player - -# Find the mac address of the first media player connected over Bluetooth -mac_addr = None -for dbus_path in dbus_tools.get_managed_objects(): - if dbus_path.endswith('player0'): - mac_addr = dbus_tools.get_device_address_from_dbus_path(dbus_path) - -if mac_addr: - mp = media_player.MediaPlayer(mac_addr) - - track_details = mp.track - for detail in track_details: - print(f'{detail} : {track_details[detail]}') -else: - print('Error: No media player connected') +from bluezero.device import Device +from bluezero import constants +from bluezero.media_player import MediaPlayer +import time + + +def filter_by_interface(objects, interface_name): + """ filters the objects based on their support for the specified interface """ + object_paths = [] + for path in objects.keys(): + interfaces = objects[path] + for interface in interfaces.keys(): + if interface == interface_name: + object_paths.append(path) + return object_paths + + +def filter_audio_devices(objects): + """ Filters devices that support the A2DP audio profile (AudioSink) """ + audio_interface = 'org.freedesktop.DBus.Properties' # Check this interface for supported profiles + audio_devices = filter_by_interface(objects, audio_interface) + + audio_sink_devices = [] + for path in audio_devices: + interfaces = objects[path] + if 'org.freedesktop.DBus.Properties' in interfaces: + if 'AudioSink' in interfaces['org.freedesktop.DBus.Properties']: + audio_sink_devices.append(path) + return audio_sink_devices + + +def ctrl_media_player(media_player): + print("Make sure you are using a media player on your phone and " + "have selected a track...") + print("To interact with the Media Player, choose the corresponding number:") + print("1. Play") + print("2. Pause") + print("3. Previous") + print("4. Next") + print("5. Track Info") + print("6. End") + + while True: + try: + command = int(input("Enter a number (1-6): ")) # Get number input + + if command == 1: + media_player.play() + elif command == 2: + media_player.pause() + elif command == 3: + media_player.previous() + elif command == 4: + media_player.next() + elif command == 5: + track_details = media_player.track + time.sleep(2) + for detail in track_details: + print(f'{detail} : {track_details[detail]}') + elif command == 6: + print("Ending media control.") + break + else: + print("Invalid choice. Please select a number between 1 and 6.") + + except ValueError: + print("Invalid input. Please enter a number between 1 and 6.") + + +def discover_devices(adapter_address): + """ Function to discover nearby devices and list them """ + bluetooth_adapter = Adapter(adapter_address) + bluetooth_adapter.nearby_discovery() + print("Scanning for nearby devices...") + + # Find managed devices and their addresses + dev_obj_path_list = filter_by_interface(dbus_tools.get_managed_objects(), constants.DEVICE_INTERFACE) + dev_addr_list = list(map(dbus_tools.get_mac_addr_from_dbus_path, dev_obj_path_list)) + + devices = [] + for addr in dev_addr_list: + # Get the name of the device + device = Device(adapter_address, addr) + try: + name = device.name + devices.append((name, addr)) + except Exception as e: + # If there is an error retrieving the device name, skip it + print(f"Error retrieving name for device {addr}: {e}") + + return devices + + +def connect_to_device(adapter_address, device_address, retries=3, delay=5): + """ Function to connect to the selected device with retries """ + attempt = 0 + while attempt < retries: + try: + remote_device = Device(adapter_address, device_address) + + # Verify if the device is paired, if not, pair it + if remote_device.paired != 1: + print(f"Pairing with {device_address}...") + try: + remote_device.pair() # Attempt to pair without timeout argument + print(f"Successfully paired with {device_address}") + except Exception as e: + print(f"Pairing failed: {e}. Retrying... ({attempt + 1}/{retries})") + print(f"Make sure you confirm pairing on computer and phone") + attempt += 1 + time.sleep(delay) + continue + + # Verify if the device is connected, if not, connect it + if remote_device.connected != 1: + print(f"Connecting to {device_address}...") + remote_device.connect() + + # Wait for the media player to be ready + time.sleep(10) + + # Initialize the media player and control it + mp = MediaPlayer(device_address) + ctrl_media_player(mp) + return # Successful connection + + except ValueError as e: + print(f"Error: {e}. Retrying... ({attempt + 1}/{retries})") + attempt += 1 + time.sleep(delay) + + print(f"Failed to connect to {device_address} after {retries} attempts.") + +def select_adapter(): + """ Prompt user to select a Bluetooth adapter """ + adapters = list_adapters() + if not adapters: + print("No Bluetooth adapters found.") + return None + + print("Available Bluetooth adapters:") + for i, adapter in enumerate(adapters, 1): + print(f"{i}. {adapter}") + + try: + choice = int(input("Select the adapter by number: ")) + if 1 <= choice <= len(adapters): + return adapters[choice - 1] + else: + print("Invalid choice. Please select a valid adapter.") + return None + except ValueError: + print("Invalid input. Please enter a number.") + return None + + +if __name__ == '__main__': + """ This script assumes that your remote device has its Bluetooth function on. + It will list the nearby devices and let the user choose which one to connect to. """ + + adapter_address = select_adapter() # Select a Bluetooth adapter + + if not adapter_address: + print("Exiting due to invalid adapter selection.") + exit(1) + + while True: + # Discover nearby devices + devices = discover_devices(adapter_address) + + if devices: + print("Nearby devices:") + for i, (name, addr) in enumerate(devices, 1): + print(f"{i}. {name} ({addr})") + + # Ask the user to select a device + try: + choice = int(input("Enter the number of the device you want to connect to (or 0 to refresh): ")) + if choice == 0: + continue # Refresh discovery if the user selects 0 + selected_device = devices[choice - 1] # Subtract 1 to match list index + print(f"Attempting to connect to {selected_device[0]} ({selected_device[1]})...") + connect_to_device(adapter_address, selected_device[1]) + break # Exit the loop after successful connection + except (ValueError, IndexError): + print("Invalid choice. Please enter a valid device number.") + else: + print("No nearby devices found.") + + # Ask if the user wants to refresh discovery or exit + refresh = input("Do you want to refresh device discovery? (y/n): ").strip().lower() + if refresh != 'y': + break From b21e9f4ec34ea6a4c33d5a16532c2f19b658fbe4 Mon Sep 17 00:00:00 2001 From: FahirahD Date: Tue, 18 Feb 2025 16:43:20 -0500 Subject: [PATCH 2/3] update the example to have a complete and simple usage flow --- examples/control_media_player.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/examples/control_media_player.py b/examples/control_media_player.py index 045d232..a5ba9e9 100644 --- a/examples/control_media_player.py +++ b/examples/control_media_player.py @@ -1,10 +1,9 @@ -# When using your Linux computer as a Bluetooth speaker the MediaPlayer -# interfaces allows you interact with the media player on the other end of -# the Bluetooth connection. -# e.g. the music player on your phone. -# This script displays information about the current track. -# Before you can run this scrip you have to pair and connect your audio -# source. +# This script allows you to use your Linux computer as a Bluetooth speaker. +# The MediaPlayer interface lets you interact with the media player on the +# other end of the Bluetooth connection (e.g., the music player on your phone). +# It displays information about the current track. +# Before running this script, ensure you pair and connect your audio source. + from bluezero.adapter import list_adapters, Adapter from bluezero import dbus_tools @@ -84,7 +83,8 @@ def discover_devices(adapter_address): print("Scanning for nearby devices...") # Find managed devices and their addresses - dev_obj_path_list = filter_by_interface(dbus_tools.get_managed_objects(), constants.DEVICE_INTERFACE) + dev_obj_path_list = filter_by_interface(dbus_tools.get_managed_objects(), + constants.DEVICE_INTERFACE) dev_addr_list = list(map(dbus_tools.get_mac_addr_from_dbus_path, dev_obj_path_list)) devices = [] @@ -116,7 +116,7 @@ def connect_to_device(adapter_address, device_address, retries=3, delay=5): print(f"Successfully paired with {device_address}") except Exception as e: print(f"Pairing failed: {e}. Retrying... ({attempt + 1}/{retries})") - print(f"Make sure you confirm pairing on computer and phone") + print("Make sure you confirm pairing on computer and phone") attempt += 1 time.sleep(delay) continue @@ -141,6 +141,7 @@ def connect_to_device(adapter_address, device_address, retries=3, delay=5): print(f"Failed to connect to {device_address} after {retries} attempts.") + def select_adapter(): """ Prompt user to select a Bluetooth adapter """ adapters = list_adapters() @@ -185,11 +186,13 @@ def select_adapter(): # Ask the user to select a device try: - choice = int(input("Enter the number of the device you want to connect to (or 0 to refresh): ")) + choice = int(input("Enter the number of the device you want to connect to " + "(or 0 to refresh): ")) if choice == 0: continue # Refresh discovery if the user selects 0 selected_device = devices[choice - 1] # Subtract 1 to match list index - print(f"Attempting to connect to {selected_device[0]} ({selected_device[1]})...") + print(f"Attempting to connect to {selected_device[0]} " + f"({selected_device[1]})...") connect_to_device(adapter_address, selected_device[1]) break # Exit the loop after successful connection except (ValueError, IndexError): @@ -200,4 +203,4 @@ def select_adapter(): # Ask if the user wants to refresh discovery or exit refresh = input("Do you want to refresh device discovery? (y/n): ").strip().lower() if refresh != 'y': - break + break \ No newline at end of file From 2edba66c32f3cd80efba0b35d056b2314265b089 Mon Sep 17 00:00:00 2001 From: FahirahD Date: Tue, 18 Feb 2025 17:58:37 -0500 Subject: [PATCH 3/3] corrected space errors for linting --- examples/control_media_player.py | 48 +++++++++++++++++++------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/examples/control_media_player.py b/examples/control_media_player.py index a5ba9e9..51d9950 100644 --- a/examples/control_media_player.py +++ b/examples/control_media_player.py @@ -3,8 +3,6 @@ # other end of the Bluetooth connection (e.g., the music player on your phone). # It displays information about the current track. # Before running this script, ensure you pair and connect your audio source. - - from bluezero.adapter import list_adapters, Adapter from bluezero import dbus_tools from bluezero.device import Device @@ -14,7 +12,8 @@ def filter_by_interface(objects, interface_name): - """ filters the objects based on their support for the specified interface """ + """ filters the objects based on their support for + the specified interface """ object_paths = [] for path in objects.keys(): interfaces = objects[path] @@ -26,7 +25,7 @@ def filter_by_interface(objects, interface_name): def filter_audio_devices(objects): """ Filters devices that support the A2DP audio profile (AudioSink) """ - audio_interface = 'org.freedesktop.DBus.Properties' # Check this interface for supported profiles + audio_interface = 'org.freedesktop.DBus.Properties' audio_devices = filter_by_interface(objects, audio_interface) audio_sink_devices = [] @@ -41,7 +40,8 @@ def filter_audio_devices(objects): def ctrl_media_player(media_player): print("Make sure you are using a media player on your phone and " "have selected a track...") - print("To interact with the Media Player, choose the corresponding number:") + print("To interact with the Media Player, " + "choose the corresponding number:") print("1. Play") print("2. Pause") print("3. Previous") @@ -51,7 +51,7 @@ def ctrl_media_player(media_player): while True: try: - command = int(input("Enter a number (1-6): ")) # Get number input + command = int(input("Enter a number (1-6): ")) if command == 1: media_player.play() @@ -70,7 +70,8 @@ def ctrl_media_player(media_player): print("Ending media control.") break else: - print("Invalid choice. Please select a number between 1 and 6.") + print("Invalid choice. Please select a number " + "between 1 and 6.") except ValueError: print("Invalid input. Please enter a number between 1 and 6.") @@ -85,7 +86,8 @@ def discover_devices(adapter_address): # Find managed devices and their addresses dev_obj_path_list = filter_by_interface(dbus_tools.get_managed_objects(), constants.DEVICE_INTERFACE) - dev_addr_list = list(map(dbus_tools.get_mac_addr_from_dbus_path, dev_obj_path_list)) + dev_addr_list = list( + map(dbus_tools.get_mac_addr_from_dbus_path, dev_obj_path_list)) devices = [] for addr in dev_addr_list: @@ -112,11 +114,14 @@ def connect_to_device(adapter_address, device_address, retries=3, delay=5): if remote_device.paired != 1: print(f"Pairing with {device_address}...") try: - remote_device.pair() # Attempt to pair without timeout argument + remote_device.pair() print(f"Successfully paired with {device_address}") except Exception as e: - print(f"Pairing failed: {e}. Retrying... ({attempt + 1}/{retries})") - print("Make sure you confirm pairing on computer and phone") + print( + f"Pairing failed: {e}. Retrying... " + f"({attempt + 1}/{retries})") + print("Make sure you confirm pairing " + "on computer and phone") attempt += 1 time.sleep(delay) continue @@ -128,7 +133,6 @@ def connect_to_device(adapter_address, device_address, retries=3, delay=5): # Wait for the media player to be ready time.sleep(10) - # Initialize the media player and control it mp = MediaPlayer(device_address) ctrl_media_player(mp) @@ -166,8 +170,10 @@ def select_adapter(): if __name__ == '__main__': - """ This script assumes that your remote device has its Bluetooth function on. - It will list the nearby devices and let the user choose which one to connect to. """ + """ This script assumes that your remote + device has its Bluetooth function on. + It will list the nearby devices and let the user + choose which one to connect to. """ adapter_address = select_adapter() # Select a Bluetooth adapter @@ -186,11 +192,14 @@ def select_adapter(): # Ask the user to select a device try: - choice = int(input("Enter the number of the device you want to connect to " - "(or 0 to refresh): ")) + choice = int( + input( + "Enter the number of the device you " + "want to connect to (or 0 to refresh): ")) if choice == 0: continue # Refresh discovery if the user selects 0 - selected_device = devices[choice - 1] # Subtract 1 to match list index + # Subtract 1 to match list index + selected_device = devices[choice - 1] print(f"Attempting to connect to {selected_device[0]} " f"({selected_device[1]})...") connect_to_device(adapter_address, selected_device[1]) @@ -201,6 +210,7 @@ def select_adapter(): print("No nearby devices found.") # Ask if the user wants to refresh discovery or exit - refresh = input("Do you want to refresh device discovery? (y/n): ").strip().lower() + refresh = input( + "Do you want to refresh device discovery? (y/n): ").strip().lower() if refresh != 'y': - break \ No newline at end of file + break