diff --git a/AUTHORS.rst b/AUTHORS.rst index dc9cff4c..b13d8c79 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -18,3 +18,4 @@ Contributors * Chad Spensky * Bernie Conrad * Jonathan Soto +* Bojan Potočnik diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 50d8fc9e..d944972e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,7 @@ Added * WinRT backend added * Added ``BleakScanner.discovered_devices`` property. +* Added BlueZ Agent for pin and passkey pairing on Linux. Changed ~~~~~~~ diff --git a/bleak/backends/bluezdbus/agent.py b/bleak/backends/bluezdbus/agent.py new file mode 100644 index 00000000..08bf7884 --- /dev/null +++ b/bleak/backends/bluezdbus/agent.py @@ -0,0 +1,382 @@ +# -*- coding: utf-8 -*- +""" +BLE Pairing Agent for BlueZ on Linux +""" +import asyncio +import enum +import logging +import time +from typing import Optional, Dict, Union + +from dbus_next import DBusError +from dbus_next.aio import MessageBus +from dbus_next.constants import BusType +from dbus_next.message import Message +from dbus_next.service import ServiceInterface, method + +from bleak.backends.bluezdbus import defs +from bleak.backends.bluezdbus.utils import assert_reply +from bleak.backends.client import PairingCallback + +logger = logging.getLogger(__name__) + +# https://python-dbus-next.readthedocs.io/en/latest/type-system/index.html +DBusObject = "o" +DBusString = "s" +DBusUInt16 = "q" +DBusUInt32 = "u" + + +class IOCapability(enum.Enum): + """I/O capabilities of this device, used for determining pairing method""" + + DISPLAY_ONLY = "DisplayOnly" + DISPLAY_YES_NO = "DisplayYesNo" + KEYBOARD_ONLY = "KeyboardOnly" + NO_IO = "NoInputNoOutput" + KEYBOARD_DISPLAY = "KeyboardDisplay" + + +class PairingAgentBlueZDBus(ServiceInterface): + """Agent for BlueZ pairing requests + + Implemented by using the `BlueZ DBUS Agent API `_. + + Args: + io_capabilities (`Capability`): I/O capabilities of this device, used for determining pairing method. + """ + + def __init__(self, io_capabilities: IOCapability = IOCapability.KEYBOARD_DISPLAY): + super().__init__(defs.AGENT_INTERFACE) + + # D-Bus message bus + self._bus: Optional[MessageBus] = None + # Path can be anything as long as it is unique + self._path = f"/org/bluez/agent{time.time() * 1000:.0f}" + # IO capabilities are required when the agent is registered + self._io_capabilities = io_capabilities + + # Callback for every device (single agent handles all pairing requests) + self._callbacks: Dict[DBusObject, PairingCallback] = {} + + def __del__(self) -> None: + if self._bus: + asyncio.ensure_future(self.unregister()) + + async def __aenter__(self) -> "PairingAgentBlueZDBus": + return await self.register() + + async def __aexit__(self, *exc_info) -> None: + await self.unregister() + + async def register(self) -> "PairingAgentBlueZDBus": + """ + Register the Agent for handling pairing requests + + Every application can register its own (and only one) agent and for all + actions triggered by that application its agent is used. If an application + chooses to not register an agent, the default system agent is used. + """ + if self._bus: + # An application can only register one agent. Multiple agents per application is not + # supported and would raise error if not handled here. + return self + # Create system bus + bus = await MessageBus(bus_type=BusType.SYSTEM).connect() + # Make this Agent interface available on the given object path + bus.export(self._path, self) + # Register the Agent + reply = await bus.call( + Message( + destination=defs.BLUEZ_SERVICE, + path="/org/bluez", + interface=defs.AGENT_MANAGER_INTERFACE, + member="RegisterAgent", + signature="os", + body=[self._path, self._io_capabilities.value], + ) + ) + assert_reply(reply) + # There is no need to register this agent as the default pairing agent using RequestDefaultAgent, + # because it will be used for all pairing requests for this application after RegisterAgent. + logger.debug(f"Pairing Agent registered on {self._path}") + self._bus = bus + return self + + async def unregister(self) -> None: + """ + Unregister the agent that has been previously registered + """ + # This method can be called multiple times, it is OK if agent is not registered anymore. + if not self._bus: + return + + reply = await self._bus.call( + Message( + destination=defs.BLUEZ_SERVICE, + path="/org/bluez", + interface=defs.AGENT_MANAGER_INTERFACE, + member="UnregisterAgent", + signature="o", + body=[self._path], + ) + ) + assert_reply(reply) + + self._bus.unexport(self._path) + self._bus.disconnect() + self._bus = None + logger.debug(f"Pairing Agent {self._path} unregistered") + + def set_callback( + self, device: DBusObject, callback: Union[PairingCallback, None] + ) -> None: + """ + Add pairing callback for specific device. + + Single pairing agent handles all pairing requests for this application, + so every device shall have it's own callback. + + Args: + device (`DBusObject`): D-Bus path to the device which will be paired and therefore requires callback. + callback (`PairingCallback` or `None`): Pairing callback invoked when this device is paired. + Any old callback registered for this device is replaced with this one. + If `None` then callback for this device is removed. + """ + if callback: + self._callbacks[device] = callback + elif device in self._callbacks: + del self._callbacks[device] + + @method(name="Release") + def _release(self): + """ + This method gets called when the service daemon + unregisters the agent. An agent can use it to do + cleanup tasks. There is no need to unregister the + agent, because when this method gets called it has + already been unregistered. + """ + logger.debug(f"{self._path}::Release()") + + @method(name="RequestPinCode") + def _request_pin_code(self, device: DBusObject) -> DBusString: + """ + This method gets called when the service daemon + needs to get the passkey for an authentication. + + The return value should be a string of 1-16 characters + length. The string can be alphanumeric. + + Possible errors: org.bluez.Error.Rejected + org.bluez.Error.Canceled + """ + cb = self._callbacks.get(device) + if not cb: + logger.debug(f"{self._path}::RequestPinCode({device})->Cancel") + raise DBusError( + f"{defs.BLUEZ_SERVICE}.Error.Canceled", + "Pin pairing for this device not supported", + ) + pin = cb(device, None, None) + + logger.debug(f"{self._path}::RequestPinCode({device})->{pin}") + + if pin is None: + raise DBusError( + f"{defs.BLUEZ_SERVICE}.Error.Rejected", "Pin pairing rejected" + ) + + return str(pin) + + @method(name="DisplayPinCode") + def _display_pin_code(self, device: DBusObject, pincode: DBusString): + """ + This method gets called when the service daemon + needs to display a pincode for an authentication. + + An empty reply should be returned. When the pincode + needs no longer to be displayed, the Cancel method + of the agent will be called. + + This is used during the pairing process of keyboards + that don't support Bluetooth 2.1 Secure Simple Pairing, + in contrast to DisplayPasskey which is used for those + that do. + + This method will only ever be called once since + older keyboards do not support typing notification. + + Note that the PIN will always be a 6-digit number, + zero-padded to 6 digits. This is for harmony with + the later specification. + + Possible errors: org.bluez.Error.Rejected + org.bluez.Error.Canceled + """ + cb = self._callbacks.get(device) + if cb: + accept = cb(device, pincode, None) + info = "Accept" if accept else "Reject" + else: + accept = True + info = "" + + logger.debug(f"{self._path}::DisplayPinCode({device}, {pincode})->{info}") + + if not accept: + raise DBusError(f"{defs.BLUEZ_SERVICE}.Error.Rejected", "Pin rejected") + + @method(name="RequestPasskey") + def _request_passkey(self, device: DBusObject) -> DBusUInt32: + """ + This method gets called when the service daemon + needs to get the passkey for an authentication. + + The return value should be a numeric value + between 0-999999. + + Possible errors: org.bluez.Error.Rejected + org.bluez.Error.Canceled + """ + cb = self._callbacks.get(device) + if not cb: + logger.debug(f"{self._path}::RequestPasskey({device})->Cancel") + raise DBusError( + f"{defs.BLUEZ_SERVICE}.Error.Canceled", + "Passkey pairing for this device not supported", + ) + passkey = cb(device, None, None) + + logger.debug(f"{self._path}::RequestPasskey({device})->{passkey}") + + if passkey is None: + raise DBusError( + f"{defs.BLUEZ_SERVICE}.Error.Rejected", "Passkey pairing rejected" + ) + + return int(passkey) + + @method(name="DisplayPasskey") + def _display_passkey( + self, device: DBusObject, passkey: DBusUInt32, entered: DBusUInt16 + ): + """ + This method gets called when the service daemon + needs to display a passkey for an authentication. + + The entered parameter indicates the number of already + typed keys on the remote side. + + An empty reply should be returned. When the passkey + needs no longer to be displayed, the Cancel method + of the agent will be called. + + During the pairing process this method might be + called multiple times to update the entered value. + + Note that the passkey will always be a 6-digit number, + so the display should be zero-padded at the start if + the value contains less than 6 digits. + """ + cb = self._callbacks.get(device) + if cb: + accept = cb(device, None, passkey) + info = "Accept" if accept else "Reject" + else: + accept = True + info = "" + + logger.debug( + f"{self._path}::DisplayPasskey({device}, {passkey:06d}, {entered})->{info}" + ) + + if not accept: + raise DBusError(f"{defs.BLUEZ_SERVICE}.Error.Rejected", "Passkey rejected") + + @method(name="RequestConfirmation") + def _request_confirmation(self, device: DBusObject, passkey: DBusUInt32): + """ + This method gets called when the service daemon + needs to confirm a passkey for an authentication. + + To confirm the value it should return an empty reply + or an error in case the passkey is invalid. + + Note that the passkey will always be a 6-digit number, + so the display should be zero-padded at the start if + the value contains less than 6 digits. + + Possible errors: org.bluez.Error.Rejected + org.bluez.Error.Canceled + """ + cb = self._callbacks.get(device) + if cb: + confirm = cb(device, None, passkey) + info = confirm + else: + confirm = True + info = "" + + logger.debug( + f"{self._path}::RequestConfirmation({device}, {passkey:06d})->{info}" + ) + + if not confirm: + raise DBusError(f"{defs.BLUEZ_SERVICE}.Error.Rejected", "Passkey rejected") + + @method(name="RequestAuthorization") + def _request_authorization(self, device: DBusObject): + """ + This method gets called to request the user to + authorize an incoming pairing attempt which + would in other circumstances trigger the just-works + model, or when the user plugged in a device that + implements cable pairing. In the latter case, the + device would not be connected to the adapter via + Bluetooth yet. + + Possible errors: org.bluez.Error.Rejected + org.bluez.Error.Canceled + """ + authorize = True + + logger.debug(f"{self._path}::RequestAuthorization({device})->{authorize}") + + if not authorize: + raise DBusError( + f"{defs.BLUEZ_SERVICE}.Error.Rejected", "Device unauthorized" + ) + + @method(name="AuthorizeService") + def _authorize_service(self, device: DBusObject, uuid: DBusString): + """ + This method gets called when the service daemon + needs to authorize a connection/service request. + + Possible errors: org.bluez.Error.Rejected + org.bluez.Error.Canceled + """ + authorize = True + + logger.debug(f"{self._path}::AuthorizeService({device}, {uuid})->{authorize}") + + if not authorize: + raise DBusError( + f"{defs.BLUEZ_SERVICE}.Error.Rejected", "Connection rejected" + ) + + @method(name="Cancel") + def _cancel(self): + """ + This method gets called to indicate that the agent + request failed before a reply was returned. + """ + logger.debug(f"{self._path}::Cancel()") + + +__all__ = ("logger", "IOCapability", "PairingAgentBlueZDBus") + +# If this file is run as __main__ or imported without ever starting the event loop, the following warning will occur: +# Start the event loop if not yet running to prevent +# sys:1: RuntimeWarning: coroutine 'PairingAgentBlueZDBus.unregister' was never awaited diff --git a/bleak/backends/bluezdbus/client.py b/bleak/backends/bluezdbus/client.py index d2c55e4b..4f49d0c1 100644 --- a/bleak/backends/bluezdbus/client.py +++ b/bleak/backends/bluezdbus/client.py @@ -18,6 +18,7 @@ from dbus_next.signature import Variant from bleak.backends.bluezdbus import defs +from bleak.backends.bluezdbus.agent import PairingAgentBlueZDBus from bleak.backends.bluezdbus.characteristic import BleakGATTCharacteristicBlueZDBus from bleak.backends.bluezdbus.descriptor import BleakGATTDescriptorBlueZDBus from bleak.backends.bluezdbus.scanner import BleakScannerBlueZDBus @@ -28,7 +29,7 @@ extract_service_handle_from_path, unpack_variants, ) -from bleak.backends.client import BaseBleakClient +from bleak.backends.client import BaseBleakClient, PairingCallback from bleak.backends.device import BLEDevice from bleak.backends.service import BleakGATTServiceCollection from bleak.exc import BleakDBusError, BleakError @@ -51,8 +52,14 @@ class BleakClientBlueZDBus(BaseBleakClient): event loop when the client is disconnected. The callable must take one argument, which will be this client object. adapter (str): Bluetooth adapter to use for discovery. + handle_pairing (bool): If set to `True`, this application is responsible for handling the pairing events + (displaying, asking for, or confirming pins and passkeys) instead of the system/OS pairing wizard. + Defaults to `False`. """ + # Instantiate pairing agent (single agent per app) but don't register it yet + pairingAgent = PairingAgentBlueZDBus() + def __init__(self, address_or_ble_device: Union[BLEDevice, str], **kwargs): super(BleakClientBlueZDBus, self).__init__(address_or_ble_device, **kwargs) # kwarg "device" is for backwards compatibility @@ -98,6 +105,58 @@ def __init__(self, address_or_ble_device: Union[BLEDevice, str], **kwargs): bluez_version[0] == 5 and bluez_version[1] >= 48 ) + # Optionally register pairing agent if this application shall handle pairing instead of OS agent + if kwargs.get("handle_pairing", False): + asyncio.ensure_future(self.pairingAgent.register()) + + @classmethod + async def remove_device( + cls, device: Union[BLEDevice, "BleakClientBlueZDBus", str], adapter: str = None + ) -> bool: + """Remove the remote device object and its pairing information + + Args: + device (`BLEDevice`, `BleakClientBlueZDBus` or str): Device MAC address or class from which this + information can be extracted as `.address`. + adapter (str): Adapter to use instead of default hci0. + + Returns: + Boolean representing if device was present and removed. + """ + # Device path and adapter are required for invoking the API call + + # Try to get .address attribute if object is provided, otherwise assume string and use it as it + address: str = getattr(device, "address", device) + if len(address) != 17 or address.count(":") != 5: + raise ValueError( + f"Device address shall be MAC address or Device/Client class, not '{address}'" + ) + # Use provided value, try to extract adapter property from device or use default hci0 + adapter = f"/org/bluez/{adapter or getattr(device, '_adapter', 'hci0')}" + + # Create system bus + bus = await MessageBus(bus_type=BusType.SYSTEM).connect() + # Remove device + reply = await bus.call( + Message( + destination=defs.BLUEZ_SERVICE, + path=adapter, + interface=defs.ADAPTER_INTERFACE, + member="RemoveDevice", + signature="o", + body=[f"{adapter}/dev_{address.replace(':', '_').upper()}"], + ) + ) + bus.disconnect() + # This method can be called multiple times or even "just to be sure", so it's normal that device doesn't exist + try: + assert_reply(reply) + return True + except BleakDBusError as e: + if e.dbus_error == f"{defs.BLUEZ_SERVICE}.Error.DoesNotExist": + return False + raise + # Connectivity methods async def connect(self, **kwargs) -> bool: @@ -477,12 +536,21 @@ async def disconnect(self) -> bool: return True - async def pair(self, *args, **kwargs) -> bool: + async def pair( + self, *args, callback: Optional[PairingCallback] = None, **kwargs + ) -> bool: """Pair with the peripheral. You can use ConnectDevice method if you already know the MAC address of the device. Else you need to StartDiscovery, Trust, Pair and Connect in sequence. + Args: + callback (`PairingCallback`): callback to be called to provide or confirm pairing pin + or passkey. If not provided and Bleak is registered as a pairing agent/manager + instead of system pairing manager, then all display- and confirm-based pairing + requests will be accepted, and requests requiring pin or passkey input will be + canceled. + Returns: Boolean regarding success of pairing. @@ -505,6 +573,15 @@ async def pair(self, *args, **kwargs) -> bool: ) return True + if callback: + self.pairingAgent.set_callback( + self._device_path, + # "/org/bluez/hci0/dev_A1_B2_C3_D4_E5_F6" -> "A1:B2:C3:D4:E5:F6" + lambda dp, pin, psk: callback( + ":".join(dp.rsplit("_", maxsplit=6)[-6:]), pin, psk + ), + ) + # Set device as trusted. reply = await self._bus.call( Message( @@ -530,7 +607,13 @@ async def pair(self, *args, **kwargs) -> bool: member="Pair", ) ) - assert_reply(reply) + try: + assert_reply(reply) + except BleakDBusError as e: + logger.error( + f"Pairing {self._device_path} with {self._adapter} failed ({e.dbus_error})" + ) + # False could be returned here, but check again just to be sure reply = await self._bus.call( Message( @@ -544,6 +627,10 @@ async def pair(self, *args, **kwargs) -> bool: ) assert_reply(reply) + if callback: + # Remove callback set above + self.pairingAgent.set_callback(self._device_path, None) + return reply.body[0].value async def unpair(self) -> bool: diff --git a/bleak/backends/bluezdbus/defs.py b/bleak/backends/bluezdbus/defs.py index 63fb704e..c176e0f8 100644 --- a/bleak/backends/bluezdbus/defs.py +++ b/bleak/backends/bluezdbus/defs.py @@ -7,6 +7,8 @@ # Bluez specific DBUS BLUEZ_SERVICE = "org.bluez" ADAPTER_INTERFACE = "org.bluez.Adapter1" +AGENT_MANAGER_INTERFACE = "org.bluez.AgentManager1" +AGENT_INTERFACE = "org.bluez.Agent1" DEVICE_INTERFACE = "org.bluez.Device1" BATTERY_INTERFACE = "org.bluez.Battery1" diff --git a/bleak/backends/client.py b/bleak/backends/client.py index 539eb486..bbe3b6c7 100644 --- a/bleak/backends/client.py +++ b/bleak/backends/client.py @@ -8,7 +8,7 @@ import abc import asyncio import uuid -from typing import Callable, Union +from typing import Callable, Union, Optional from warnings import warn from bleak.backends.service import BleakGATTServiceCollection @@ -16,6 +16,42 @@ from bleak.backends.device import BLEDevice +PairingCallback = Callable[ + [str, Union[None, str], Union[None, int]], Union[bool, int, str, None] +] +"""Type of the pairing callback function + +Args: + device (str): Address of the peer device participating in the pairing process. + pin (None or str): If this parameter is not `None`, then this callback is invoked + because the service daemon needs to display a pincode for an authentication. + The application must display the given PIN to the user, who will then need to + either enter this PIN on the peer device that is being paired (if such device + has input but no output capabilities), or confirm that the PIN matches the one + shown on the peer device. + Return `True` to proceed with the pairing or `False` to reject/cancel it. Note, + however, that canceling the pairing at this point might still result in device + being paired, because for some pairing methods, the system and the target + device don't need any confirmation. + passkey (None or int): Very similar to `pin` argument, just that this value is used + on some systems when Passkey Entry pairing method is initiated. On some systems + this will always be `None` and passkey is provided as `pin` argument. + If not `None`, the passkey will always represent a 6-digit number, so the + display should be zero-padded at the start if the value contains less than 6 + digits. + Return value logic is the same as for `pin` - return `True` to proceed with the + pairing or `False` to reject/cancel it. + +Returns: + `True` to confirm pairing with provided `pin` or `passkey`, if any of them is not `None`. + If `pin` and `passkey` are both `None`, it means that this callback got invoked because + the service daemon needs to get the pincode or passkey for an authentication. The return + value should be an alphanumeric string of 1-16 characters length (pincode) or a numeric + value between 0-999999 (passkey). + Pairing is rejected if `None` is returned. +""" + + class BaseBleakClient(abc.ABC): """The Client Interface for Bleak Backend implementations to implement. @@ -111,8 +147,21 @@ async def disconnect(self) -> bool: raise NotImplementedError() @abc.abstractmethod - async def pair(self, *args, **kwargs) -> bool: - """Pair with the peripheral.""" + async def pair( + self, *args, callback: Optional[PairingCallback] = None, **kwargs + ) -> bool: + """Pair with the peripheral. + + Args: + callback (`PairingCallback`): callback to be called to provide or confirm pairing pin + or passkey. If not provided and Bleak is registered as a pairing agent/manager + instead of system pairing manager, then all display- and confirm-based pairing + requests will be accepted, and requests requiring pin or passkey input will be + canceled. + + Returns: + Boolean representing success of pairing. + """ raise NotImplementedError() @abc.abstractmethod diff --git a/bleak/backends/dotnet/client.py b/bleak/backends/dotnet/client.py index 4767fe6e..14579617 100644 --- a/bleak/backends/dotnet/client.py +++ b/bleak/backends/dotnet/client.py @@ -9,12 +9,12 @@ import asyncio import uuid from functools import wraps -from typing import Callable, Any, List, Union +from typing import Callable, Any, List, Union, Optional from bleak.backends.device import BLEDevice from bleak.backends.dotnet.scanner import BleakScannerDotNet from bleak.exc import BleakError, BleakDotNetTaskError, CONTROLLER_ERROR_CODES -from bleak.backends.client import BaseBleakClient +from bleak.backends.client import BaseBleakClient, PairingCallback from bleak.backends.dotnet.utils import ( BleakDataReader, BleakDataWriter, @@ -305,7 +305,13 @@ def is_connected(self) -> bool: else self._requester.ConnectionStatus == BluetoothConnectionStatus.Connected ) - async def pair(self, protection_level=None, **kwargs) -> bool: + async def pair( + self, + protection_level=None, + *args, + callback: Optional[PairingCallback] = None, + **kwargs, + ) -> bool: """Attempts to pair with the device. Keyword Args: @@ -315,6 +321,11 @@ async def pair(self, protection_level=None, **kwargs) -> bool: 2: Encryption - Pair the device using encryption. 3: EncryptionAndAuthentication - Pair the device using encryption and authentication. (This will not work in Bleak...) + callback (`PairingCallback`): callback to be called to provide or confirm pairing pin + or passkey. If not provided and Bleak is registered as a pairing agent/manager + instead of system pairing manager, then all display- and confirm-based pairing + requests will be accepted, and requests requiring pin or passkey input will be + canceled. Returns: Boolean regarding success of pairing. @@ -325,12 +336,35 @@ async def pair(self, protection_level=None, **kwargs) -> bool: and not self._requester.DeviceInformation.Pairing.IsPaired ): - # Currently only supporting Just Works solutions... - ceremony = DevicePairingKinds.ConfirmOnly + if callback: + # TODO: All BLE pairing methods are supported + ceremony = ( + DevicePairingKinds.ConfirmOnly + # + DevicePairingKinds.ConfirmPinMatch + # + DevicePairingKinds.DisplayPin + # + DevicePairingKinds.ProvidePin + ) + else: + ceremony = DevicePairingKinds.ConfirmOnly custom_pairing = self._requester.DeviceInformation.Pairing.Custom def handler(sender, args): - args.Accept() + if callback: + if args.PairingKind == DevicePairingKinds.ConfirmOnly: + args.Accept() + # TODO: Get device MAC for first argument, test conversion, flags can have multiple values set (mask) + # elif ( + # args.PairingKind == DevicePairingKinds.ConfirmPinMatch + # or args.PairingKind == DevicePairingKinds.DisplayPin + # ): + # if callback("", args.Pin, None) is True: + # args.Accept() + # elif args.PairingKind == DevicePairingKinds.ProvidePin: + # pin = callback("", None, None) + # if pin: + # args.Accept(pin) + else: + args.Accept() pairing_requested_token = custom_pairing.add_PairingRequested( TypedEventHandler[ diff --git a/examples/passkey_pairing.py b/examples/passkey_pairing.py new file mode 100644 index 00000000..741a8656 --- /dev/null +++ b/examples/passkey_pairing.py @@ -0,0 +1,68 @@ +""" +Services +---------------- + +An example showing how to pair using Passkey Entry pairing method using pre-shared passkey + +Created on 2021-04-20 by Bojan Potočnik + +""" + +import asyncio +import os +import platform +from typing import Union + +# os.environ["BLEAK_LOGGING"] = "1" + +from bleak import BleakClient + + +def get_passkey( + device: str, pin: Union[None, str], passkey: Union[None, int] +) -> Union[bool, int, str, None]: + if pin: + print(f"Device {device} is displaying pin '{pin}'") + return True + + if passkey: + print(f"Device {device} is displaying passkey '{passkey:06d}'") + return True + + # Retrieve passkey using custom algorithm, web API or just ask the user like OS pairing wizard would do + psk = input( + f"Provide pin (1-16 characters) or passkey (0-999999) for {device}, or nothing to reject pairing: " + ) + + # Return None if psk is empty string (pincode 0 is valid pin, but "0" is True) + return psk or None + + +async def main(mac_addr: str): + # Remove this device if it is already paired (from previous runs) + if await BleakClient.remove_device(mac_addr): + print(f"Device {mac_addr} was unpaired") + + # Pairing agent shall be registered before initiating the connection + async with BleakClient(mac_addr, handle_pairing=True) as client: + print("Pairing...") + print(await client.pair(callback=get_passkey)) + print("Paired") + + services = await client.get_services() + print(services) + for service in services: + print(service) + for char in service.characteristics: + print("\t", char) + print("\t\tValue: ", await client.read_gatt_char(char)) + + +if platform.system() != "Linux": + raise EnvironmentError( + "Pairing methods other than Just Works are currently implemented only on BlueZ backend" + ) + +loop = asyncio.get_event_loop() +loop.set_debug(True) +loop.run_until_complete(main("24:71:89:cc:09:05"))