diff --git a/AUTHORS.rst b/AUTHORS.rst index c2140593..fc92aa13 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -19,3 +19,5 @@ Contributors * Bernie Conrad * Jonathan Soto * Kyle J. Williams +* Bojan Potočnik +* Jeff Peters diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 398216fc..07e04bdd 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,8 @@ Added ~~~~~ * Allow 16-bit UUID string arguments to ``get_service()`` and ``get_characteristic()``. * Added ``register_uuids()`` to augment the uuid-to-description mapping. +* Added BlueZ Agent for pin and passkey pairing on Linux. +* Added pairing support for .NET and WinRT. Fixed ~~~~~ @@ -50,6 +52,7 @@ Added * Added ``BleakScanner.find_device_by_filter`` static method. * Added ``scanner_byname.py`` example. * Added optional command line argument to specify device to all applicable examples. +* Added BlueZ Agent for pin and passkey pairing on Linux. Changed ~~~~~~~ diff --git a/bleak/__version__.py b/bleak/__version__.py index b53dbb55..8d8c976f 100644 --- a/bleak/__version__.py +++ b/bleak/__version__.py @@ -1,3 +1,3 @@ # -*- coding: utf-8 -*- -__version__ = "0.13.0a1" +__version__ = "0.13.1a1" diff --git a/bleak/backends/bluezdbus/agent.py b/bleak/backends/bluezdbus/agent.py new file mode 100644 index 00000000..08bf7884 --- /dev/null +++ b/bleak/backends/bluezdbus/agent.py @@ -0,0 +1,382 @@ +# -*- coding: utf-8 -*- +""" +BLE Pairing Agent for BlueZ on Linux +""" +import asyncio +import enum +import logging +import time +from typing import Optional, Dict, Union + +from dbus_next import DBusError +from dbus_next.aio import MessageBus +from dbus_next.constants import BusType +from dbus_next.message import Message +from dbus_next.service import ServiceInterface, method + +from bleak.backends.bluezdbus import defs +from bleak.backends.bluezdbus.utils import assert_reply +from bleak.backends.client import PairingCallback + +logger = logging.getLogger(__name__) + +# https://python-dbus-next.readthedocs.io/en/latest/type-system/index.html +DBusObject = "o" +DBusString = "s" +DBusUInt16 = "q" +DBusUInt32 = "u" + + +class IOCapability(enum.Enum): + """I/O capabilities of this device, used for determining pairing method""" + + DISPLAY_ONLY = "DisplayOnly" + DISPLAY_YES_NO = "DisplayYesNo" + KEYBOARD_ONLY = "KeyboardOnly" + NO_IO = "NoInputNoOutput" + KEYBOARD_DISPLAY = "KeyboardDisplay" + + +class PairingAgentBlueZDBus(ServiceInterface): + """Agent for BlueZ pairing requests + + Implemented by using the `BlueZ DBUS Agent API `_. + + Args: + io_capabilities (`Capability`): I/O capabilities of this device, used for determining pairing method. + """ + + def __init__(self, io_capabilities: IOCapability = IOCapability.KEYBOARD_DISPLAY): + super().__init__(defs.AGENT_INTERFACE) + + # D-Bus message bus + self._bus: Optional[MessageBus] = None + # Path can be anything as long as it is unique + self._path = f"/org/bluez/agent{time.time() * 1000:.0f}" + # IO capabilities are required when the agent is registered + self._io_capabilities = io_capabilities + + # Callback for every device (single agent handles all pairing requests) + self._callbacks: Dict[DBusObject, PairingCallback] = {} + + def __del__(self) -> None: + if self._bus: + asyncio.ensure_future(self.unregister()) + + async def __aenter__(self) -> "PairingAgentBlueZDBus": + return await self.register() + + async def __aexit__(self, *exc_info) -> None: + await self.unregister() + + async def register(self) -> "PairingAgentBlueZDBus": + """ + Register the Agent for handling pairing requests + + Every application can register its own (and only one) agent and for all + actions triggered by that application its agent is used. If an application + chooses to not register an agent, the default system agent is used. + """ + if self._bus: + # An application can only register one agent. Multiple agents per application is not + # supported and would raise error if not handled here. + return self + # Create system bus + bus = await MessageBus(bus_type=BusType.SYSTEM).connect() + # Make this Agent interface available on the given object path + bus.export(self._path, self) + # Register the Agent + reply = await bus.call( + Message( + destination=defs.BLUEZ_SERVICE, + path="/org/bluez", + interface=defs.AGENT_MANAGER_INTERFACE, + member="RegisterAgent", + signature="os", + body=[self._path, self._io_capabilities.value], + ) + ) + assert_reply(reply) + # There is no need to register this agent as the default pairing agent using RequestDefaultAgent, + # because it will be used for all pairing requests for this application after RegisterAgent. + logger.debug(f"Pairing Agent registered on {self._path}") + self._bus = bus + return self + + async def unregister(self) -> None: + """ + Unregister the agent that has been previously registered + """ + # This method can be called multiple times, it is OK if agent is not registered anymore. + if not self._bus: + return + + reply = await self._bus.call( + Message( + destination=defs.BLUEZ_SERVICE, + path="/org/bluez", + interface=defs.AGENT_MANAGER_INTERFACE, + member="UnregisterAgent", + signature="o", + body=[self._path], + ) + ) + assert_reply(reply) + + self._bus.unexport(self._path) + self._bus.disconnect() + self._bus = None + logger.debug(f"Pairing Agent {self._path} unregistered") + + def set_callback( + self, device: DBusObject, callback: Union[PairingCallback, None] + ) -> None: + """ + Add pairing callback for specific device. + + Single pairing agent handles all pairing requests for this application, + so every device shall have it's own callback. + + Args: + device (`DBusObject`): D-Bus path to the device which will be paired and therefore requires callback. + callback (`PairingCallback` or `None`): Pairing callback invoked when this device is paired. + Any old callback registered for this device is replaced with this one. + If `None` then callback for this device is removed. + """ + if callback: + self._callbacks[device] = callback + elif device in self._callbacks: + del self._callbacks[device] + + @method(name="Release") + def _release(self): + """ + This method gets called when the service daemon + unregisters the agent. An agent can use it to do + cleanup tasks. There is no need to unregister the + agent, because when this method gets called it has + already been unregistered. + """ + logger.debug(f"{self._path}::Release()") + + @method(name="RequestPinCode") + def _request_pin_code(self, device: DBusObject) -> DBusString: + """ + This method gets called when the service daemon + needs to get the passkey for an authentication. + + The return value should be a string of 1-16 characters + length. The string can be alphanumeric. + + Possible errors: org.bluez.Error.Rejected + org.bluez.Error.Canceled + """ + cb = self._callbacks.get(device) + if not cb: + logger.debug(f"{self._path}::RequestPinCode({device})->Cancel") + raise DBusError( + f"{defs.BLUEZ_SERVICE}.Error.Canceled", + "Pin pairing for this device not supported", + ) + pin = cb(device, None, None) + + logger.debug(f"{self._path}::RequestPinCode({device})->{pin}") + + if pin is None: + raise DBusError( + f"{defs.BLUEZ_SERVICE}.Error.Rejected", "Pin pairing rejected" + ) + + return str(pin) + + @method(name="DisplayPinCode") + def _display_pin_code(self, device: DBusObject, pincode: DBusString): + """ + This method gets called when the service daemon + needs to display a pincode for an authentication. + + An empty reply should be returned. When the pincode + needs no longer to be displayed, the Cancel method + of the agent will be called. + + This is used during the pairing process of keyboards + that don't support Bluetooth 2.1 Secure Simple Pairing, + in contrast to DisplayPasskey which is used for those + that do. + + This method will only ever be called once since + older keyboards do not support typing notification. + + Note that the PIN will always be a 6-digit number, + zero-padded to 6 digits. This is for harmony with + the later specification. + + Possible errors: org.bluez.Error.Rejected + org.bluez.Error.Canceled + """ + cb = self._callbacks.get(device) + if cb: + accept = cb(device, pincode, None) + info = "Accept" if accept else "Reject" + else: + accept = True + info = "" + + logger.debug(f"{self._path}::DisplayPinCode({device}, {pincode})->{info}") + + if not accept: + raise DBusError(f"{defs.BLUEZ_SERVICE}.Error.Rejected", "Pin rejected") + + @method(name="RequestPasskey") + def _request_passkey(self, device: DBusObject) -> DBusUInt32: + """ + This method gets called when the service daemon + needs to get the passkey for an authentication. + + The return value should be a numeric value + between 0-999999. + + Possible errors: org.bluez.Error.Rejected + org.bluez.Error.Canceled + """ + cb = self._callbacks.get(device) + if not cb: + logger.debug(f"{self._path}::RequestPasskey({device})->Cancel") + raise DBusError( + f"{defs.BLUEZ_SERVICE}.Error.Canceled", + "Passkey pairing for this device not supported", + ) + passkey = cb(device, None, None) + + logger.debug(f"{self._path}::RequestPasskey({device})->{passkey}") + + if passkey is None: + raise DBusError( + f"{defs.BLUEZ_SERVICE}.Error.Rejected", "Passkey pairing rejected" + ) + + return int(passkey) + + @method(name="DisplayPasskey") + def _display_passkey( + self, device: DBusObject, passkey: DBusUInt32, entered: DBusUInt16 + ): + """ + This method gets called when the service daemon + needs to display a passkey for an authentication. + + The entered parameter indicates the number of already + typed keys on the remote side. + + An empty reply should be returned. When the passkey + needs no longer to be displayed, the Cancel method + of the agent will be called. + + During the pairing process this method might be + called multiple times to update the entered value. + + Note that the passkey will always be a 6-digit number, + so the display should be zero-padded at the start if + the value contains less than 6 digits. + """ + cb = self._callbacks.get(device) + if cb: + accept = cb(device, None, passkey) + info = "Accept" if accept else "Reject" + else: + accept = True + info = "" + + logger.debug( + f"{self._path}::DisplayPasskey({device}, {passkey:06d}, {entered})->{info}" + ) + + if not accept: + raise DBusError(f"{defs.BLUEZ_SERVICE}.Error.Rejected", "Passkey rejected") + + @method(name="RequestConfirmation") + def _request_confirmation(self, device: DBusObject, passkey: DBusUInt32): + """ + This method gets called when the service daemon + needs to confirm a passkey for an authentication. + + To confirm the value it should return an empty reply + or an error in case the passkey is invalid. + + Note that the passkey will always be a 6-digit number, + so the display should be zero-padded at the start if + the value contains less than 6 digits. + + Possible errors: org.bluez.Error.Rejected + org.bluez.Error.Canceled + """ + cb = self._callbacks.get(device) + if cb: + confirm = cb(device, None, passkey) + info = confirm + else: + confirm = True + info = "" + + logger.debug( + f"{self._path}::RequestConfirmation({device}, {passkey:06d})->{info}" + ) + + if not confirm: + raise DBusError(f"{defs.BLUEZ_SERVICE}.Error.Rejected", "Passkey rejected") + + @method(name="RequestAuthorization") + def _request_authorization(self, device: DBusObject): + """ + This method gets called to request the user to + authorize an incoming pairing attempt which + would in other circumstances trigger the just-works + model, or when the user plugged in a device that + implements cable pairing. In the latter case, the + device would not be connected to the adapter via + Bluetooth yet. + + Possible errors: org.bluez.Error.Rejected + org.bluez.Error.Canceled + """ + authorize = True + + logger.debug(f"{self._path}::RequestAuthorization({device})->{authorize}") + + if not authorize: + raise DBusError( + f"{defs.BLUEZ_SERVICE}.Error.Rejected", "Device unauthorized" + ) + + @method(name="AuthorizeService") + def _authorize_service(self, device: DBusObject, uuid: DBusString): + """ + This method gets called when the service daemon + needs to authorize a connection/service request. + + Possible errors: org.bluez.Error.Rejected + org.bluez.Error.Canceled + """ + authorize = True + + logger.debug(f"{self._path}::AuthorizeService({device}, {uuid})->{authorize}") + + if not authorize: + raise DBusError( + f"{defs.BLUEZ_SERVICE}.Error.Rejected", "Connection rejected" + ) + + @method(name="Cancel") + def _cancel(self): + """ + This method gets called to indicate that the agent + request failed before a reply was returned. + """ + logger.debug(f"{self._path}::Cancel()") + + +__all__ = ("logger", "IOCapability", "PairingAgentBlueZDBus") + +# If this file is run as __main__ or imported without ever starting the event loop, the following warning will occur: +# Start the event loop if not yet running to prevent +# sys:1: RuntimeWarning: coroutine 'PairingAgentBlueZDBus.unregister' was never awaited diff --git a/bleak/backends/bluezdbus/client.py b/bleak/backends/bluezdbus/client.py index 88fdf3aa..4f99d5df 100644 --- a/bleak/backends/bluezdbus/client.py +++ b/bleak/backends/bluezdbus/client.py @@ -18,6 +18,7 @@ from dbus_next.signature import Variant from bleak.backends.bluezdbus import defs +from bleak.backends.bluezdbus.agent import PairingAgentBlueZDBus from bleak.backends.bluezdbus.characteristic import BleakGATTCharacteristicBlueZDBus from bleak.backends.bluezdbus.descriptor import BleakGATTDescriptorBlueZDBus from bleak.backends.bluezdbus.scanner import BleakScannerBlueZDBus @@ -28,7 +29,7 @@ extract_service_handle_from_path, unpack_variants, ) -from bleak.backends.client import BaseBleakClient +from bleak.backends.client import BaseBleakClient, PairingCallback from bleak.backends.device import BLEDevice from bleak.backends.service import BleakGATTServiceCollection from bleak.exc import BleakDBusError, BleakError @@ -51,8 +52,14 @@ class BleakClientBlueZDBus(BaseBleakClient): event loop when the client is disconnected. The callable must take one argument, which will be this client object. adapter (str): Bluetooth adapter to use for discovery. + handle_pairing (bool): If set to `True`, this application is responsible for handling the pairing events + (displaying, asking for, or confirming pins and passkeys) instead of the system/OS pairing wizard. + Defaults to `False`. """ + # Instantiate pairing agent (single agent per app) but don't register it yet + pairingAgent = PairingAgentBlueZDBus() + def __init__(self, address_or_ble_device: Union[BLEDevice, str], **kwargs): super(BleakClientBlueZDBus, self).__init__(address_or_ble_device, **kwargs) # kwarg "device" is for backwards compatibility @@ -103,6 +110,58 @@ def __init__(self, address_or_ble_device: Union[BLEDevice, str], **kwargs): bluez_version[0] == 5 and bluez_version[1] >= 48 ) + # Optionally register pairing agent if this application shall handle pairing instead of OS agent + if kwargs.get("handle_pairing", False): + asyncio.ensure_future(self.pairingAgent.register()) + + @classmethod + async def remove_device( + cls, device: Union[BLEDevice, "BleakClientBlueZDBus", str], adapter: str = None + ) -> bool: + """Remove the remote device object and its pairing information + + Args: + device (`BLEDevice`, `BleakClientBlueZDBus` or str): Device MAC address or class from which this + information can be extracted as `.address`. + adapter (str): Adapter to use instead of default hci0. + + Returns: + Boolean representing if device was present and removed. + """ + # Device path and adapter are required for invoking the API call + + # Try to get .address attribute if object is provided, otherwise assume string and use it as it + address: str = getattr(device, "address", device) + if len(address) != 17 or address.count(":") != 5: + raise ValueError( + f"Device address shall be MAC address or Device/Client class, not '{address}'" + ) + # Use provided value, try to extract adapter property from device or use default hci0 + adapter = f"/org/bluez/{adapter or getattr(device, '_adapter', 'hci0')}" + + # Create system bus + bus = await MessageBus(bus_type=BusType.SYSTEM).connect() + # Remove device + reply = await bus.call( + Message( + destination=defs.BLUEZ_SERVICE, + path=adapter, + interface=defs.ADAPTER_INTERFACE, + member="RemoveDevice", + signature="o", + body=[f"{adapter}/dev_{address.replace(':', '_').upper()}"], + ) + ) + bus.disconnect() + # This method can be called multiple times or even "just to be sure", so it's normal that device doesn't exist + try: + assert_reply(reply) + return True + except BleakDBusError as e: + if e.dbus_error == f"{defs.BLUEZ_SERVICE}.Error.DoesNotExist": + return False + raise + # Connectivity methods async def connect(self, **kwargs) -> bool: @@ -481,12 +540,21 @@ async def disconnect(self) -> bool: return True - async def pair(self, *args, **kwargs) -> bool: + async def pair( + self, *args, callback: Optional[PairingCallback] = None, **kwargs + ) -> bool: """Pair with the peripheral. You can use ConnectDevice method if you already know the MAC address of the device. Else you need to StartDiscovery, Trust, Pair and Connect in sequence. + Args: + callback (`PairingCallback`): callback to be called to provide or confirm pairing pin + or passkey. If not provided and Bleak is registered as a pairing agent/manager + instead of system pairing manager, then all display- and confirm-based pairing + requests will be accepted, and requests requiring pin or passkey input will be + canceled. + Returns: Boolean regarding success of pairing. @@ -509,6 +577,15 @@ async def pair(self, *args, **kwargs) -> bool: ) return True + if callback: + self.pairingAgent.set_callback( + self._device_path, + # "/org/bluez/hci0/dev_A1_B2_C3_D4_E5_F6" -> "A1:B2:C3:D4:E5:F6" + lambda dp, pin, psk: callback( + ":".join(dp.rsplit("_", maxsplit=6)[-6:]), pin, psk + ), + ) + # Set device as trusted. reply = await self._bus.call( Message( @@ -534,7 +611,13 @@ async def pair(self, *args, **kwargs) -> bool: member="Pair", ) ) - assert_reply(reply) + try: + assert_reply(reply) + except BleakDBusError as e: + logger.error( + f"Pairing {self._device_path} with {self._adapter} failed ({e.dbus_error})" + ) + # False could be returned here, but check again just to be sure reply = await self._bus.call( Message( @@ -548,6 +631,10 @@ async def pair(self, *args, **kwargs) -> bool: ) assert_reply(reply) + if callback: + # Remove callback set above + self.pairingAgent.set_callback(self._device_path, None) + return reply.body[0].value async def unpair(self) -> bool: diff --git a/bleak/backends/bluezdbus/defs.py b/bleak/backends/bluezdbus/defs.py index 63fb704e..c176e0f8 100644 --- a/bleak/backends/bluezdbus/defs.py +++ b/bleak/backends/bluezdbus/defs.py @@ -7,6 +7,8 @@ # Bluez specific DBUS BLUEZ_SERVICE = "org.bluez" ADAPTER_INTERFACE = "org.bluez.Adapter1" +AGENT_MANAGER_INTERFACE = "org.bluez.AgentManager1" +AGENT_INTERFACE = "org.bluez.Agent1" DEVICE_INTERFACE = "org.bluez.Device1" BATTERY_INTERFACE = "org.bluez.Battery1" diff --git a/bleak/backends/client.py b/bleak/backends/client.py index 5a1b0880..245b4a81 100644 --- a/bleak/backends/client.py +++ b/bleak/backends/client.py @@ -8,7 +8,7 @@ import abc import asyncio import uuid -from typing import Callable, Optional, Union +from typing import Callable, Union, Optional from warnings import warn from bleak.backends.service import BleakGATTServiceCollection @@ -16,6 +16,42 @@ from bleak.backends.device import BLEDevice +PairingCallback = Callable[ + [str, Union[None, str], Union[None, int]], Union[bool, int, str, None] +] +"""Type of the pairing callback function + +Args: + device (str): Address of the peer device participating in the pairing process. + pin (None or str): If this parameter is not `None`, then this callback is invoked + because the service daemon needs to display a pincode for an authentication. + The application must display the given PIN to the user, who will then need to + either enter this PIN on the peer device that is being paired (if such device + has input but no output capabilities), or confirm that the PIN matches the one + shown on the peer device. + Return `True` to proceed with the pairing or `False` to reject/cancel it. Note, + however, that canceling the pairing at this point might still result in device + being paired, because for some pairing methods, the system and the target + device don't need any confirmation. + passkey (None or int): Very similar to `pin` argument, just that this value is used + on some systems when Passkey Entry pairing method is initiated. On some systems + this will always be `None` and passkey is provided as `pin` argument. + If not `None`, the passkey will always represent a 6-digit number, so the + display should be zero-padded at the start if the value contains less than 6 + digits. + Return value logic is the same as for `pin` - return `True` to proceed with the + pairing or `False` to reject/cancel it. + +Returns: + `True` to confirm pairing with provided `pin` or `passkey`, if any of them is not `None`. + If `pin` and `passkey` are both `None`, it means that this callback got invoked because + the service daemon needs to get the pincode or passkey for an authentication. The return + value should be an alphanumeric string of 1-16 characters length (pincode) or a numeric + value between 0-999999 (passkey). + Pairing is rejected if `None` is returned. +""" + + class BaseBleakClient(abc.ABC): """The Client Interface for Bleak Backend implementations to implement. @@ -111,8 +147,21 @@ async def disconnect(self) -> bool: raise NotImplementedError() @abc.abstractmethod - async def pair(self, *args, **kwargs) -> bool: - """Pair with the peripheral.""" + async def pair( + self, *args, callback: Optional[PairingCallback] = None, **kwargs + ) -> bool: + """Pair with the peripheral. + + Args: + callback (`PairingCallback`): callback to be called to provide or confirm pairing pin + or passkey. If not provided and Bleak is registered as a pairing agent/manager + instead of system pairing manager, then all display- and confirm-based pairing + requests will be accepted, and requests requiring pin or passkey input will be + canceled. + + Returns: + Boolean representing success of pairing. + """ raise NotImplementedError() @abc.abstractmethod @@ -141,7 +190,8 @@ def __bool__(self): def __call__(self) -> bool: warn( - "is_connected has been changed to a property. Calling it as an async method will be removed in a future version", + "is_connected has been changed to a property. Calling it as an async method will be removed in a " + "future version", FutureWarning, stacklevel=2, ) diff --git a/bleak/backends/dotnet/BleakUWPBridge.dll b/bleak/backends/dotnet/BleakUWPBridge.dll deleted file mode 100644 index f6fe2179..00000000 Binary files a/bleak/backends/dotnet/BleakUWPBridge.dll and /dev/null differ diff --git a/bleak/backends/dotnet/__init__.py b/bleak/backends/dotnet/__init__.py deleted file mode 100644 index 78e49640..00000000 --- a/bleak/backends/dotnet/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -# -*- coding: utf-8 -*- -""" -__init__.py - -Created on 2017-11-19 by hbldh - -""" -import sys -import pathlib -import logging - -import clr - -logger = logging.getLogger(__name__) -_here = pathlib.Path(__file__).parent - -# BleakUWPBridge -sys.path.append(str(pathlib.Path(__file__).parent)) -clr.AddReference("BleakUWPBridge") diff --git a/bleak/backends/dotnet/characteristic.py b/bleak/backends/dotnet/characteristic.py deleted file mode 100644 index 9a4c6a79..00000000 --- a/bleak/backends/dotnet/characteristic.py +++ /dev/null @@ -1,106 +0,0 @@ -# -*- coding: utf-8 -*- -from uuid import UUID -from typing import List, Union - -from bleak.backends.characteristic import BleakGATTCharacteristic -from bleak.backends.descriptor import BleakGATTDescriptor -from bleak.backends.dotnet.descriptor import BleakGATTDescriptorDotNet - -# Import of BleakBridge to enable loading of winrt bindings -from BleakBridge import Bridge # noqa: F401 - -from Windows.Devices.Bluetooth.GenericAttributeProfile import GattCharacteristic - -# Python representation of -# TODO: Formalize this to Enum for all backends. -_GattCharacteristicsPropertiesEnum = { - None: ("None", "The characteristic doesn’t have any properties that apply"), - 1: ("Broadcast".lower(), "The characteristic supports broadcasting"), - 2: ("Read".lower(), "The characteristic is readable"), - 4: ( - "Write-Without-Response".lower(), - "The characteristic supports Write Without Response", - ), - 8: ("Write".lower(), "The characteristic is writable"), - 16: ("Notify".lower(), "The characteristic is notifiable"), - 32: ("Indicate".lower(), "The characteristic is indicatable"), - 64: ( - "Authenticated-Signed-Writes".lower(), - "The characteristic supports signed writes", - ), - 128: ( - "Extended-Properties".lower(), - "The ExtendedProperties Descriptor is present", - ), - 256: ("Reliable-Writes".lower(), "The characteristic supports reliable writes"), - 512: ( - "Writable-Auxiliaries".lower(), - "The characteristic has writable auxiliaries", - ), -} - - -class BleakGATTCharacteristicDotNet(BleakGATTCharacteristic): - """GATT Characteristic implementation for the .NET backend""" - - def __init__(self, obj: GattCharacteristic): - super().__init__(obj) - self.__descriptors = [ - # BleakGATTDescriptorDotNet(d, self.uuid) for d in obj.GetAllDescriptors() - ] - self.__props = [ - _GattCharacteristicsPropertiesEnum[v][0] - for v in [2 ** n for n in range(10)] - if (self.obj.CharacteristicProperties & v) - ] - - @property - def service_uuid(self) -> str: - """The uuid of the Service containing this characteristic""" - return self.obj.Service.Uuid.ToString() - - @property - def service_handle(self) -> int: - """The integer handle of the Service containing this characteristic""" - return int(self.obj.Service.AttributeHandle) - - @property - def handle(self) -> int: - """The handle of this characteristic""" - return int(self.obj.AttributeHandle) - - @property - def uuid(self) -> str: - """The uuid of this characteristic""" - return self.obj.Uuid.ToString() - - @property - def properties(self) -> List: - """Properties of this characteristic""" - return self.__props - - @property - def descriptors(self) -> List[BleakGATTDescriptorDotNet]: - """List of descriptors for this service""" - return self.__descriptors - - def get_descriptor( - self, specifier: Union[int, str, UUID] - ) -> Union[BleakGATTDescriptorDotNet, None]: - """Get a descriptor by handle (int) or UUID (str or uuid.UUID)""" - try: - if isinstance(specifier, int): - return next(filter(lambda x: x.handle == specifier, self.descriptors)) - else: - return next( - filter(lambda x: x.uuid == str(specifier), self.descriptors) - ) - except StopIteration: - return None - - def add_descriptor(self, descriptor: BleakGATTDescriptor): - """Add a :py:class:`~BleakGATTDescriptor` to the characteristic. - - Should not be used by end user, but rather by `bleak` itself. - """ - self.__descriptors.append(descriptor) diff --git a/bleak/backends/dotnet/client.py b/bleak/backends/dotnet/client.py deleted file mode 100644 index 41179d5b..00000000 --- a/bleak/backends/dotnet/client.py +++ /dev/null @@ -1,937 +0,0 @@ -# -*- coding: utf-8 -*- -""" -BLE Client for Windows 10 systems. - -Created on 2017-12-05 by hbldh -""" -import inspect -import logging -import asyncio -import uuid -from functools import wraps -from typing import Callable, Any, List, Union - -from bleak.backends.device import BLEDevice -from bleak.backends.dotnet.scanner import BleakScannerDotNet -from bleak.exc import BleakError, BleakDotNetTaskError, CONTROLLER_ERROR_CODES -from bleak.backends.client import BaseBleakClient -from bleak.backends.dotnet.utils import ( - BleakDataReader, - BleakDataWriter, - wrap_IAsyncOperation, -) - -from bleak.backends.characteristic import BleakGATTCharacteristic -from bleak.backends.service import BleakGATTServiceCollection -from bleak.backends.dotnet.service import BleakGATTServiceDotNet -from bleak.backends.dotnet.characteristic import BleakGATTCharacteristicDotNet -from bleak.backends.dotnet.descriptor import BleakGATTDescriptorDotNet - - -# CLR imports - -# Import of BleakBridge to enable loading of winrt bindings -from BleakBridge import Bridge # noqa: F401 - -# Import of other CLR components needed. -from System import UInt64, Object -from System.Runtime.InteropServices.WindowsRuntime import EventRegistrationToken -from Windows.Foundation import IAsyncOperation, TypedEventHandler -from Windows.Devices.Enumeration import ( - DevicePairingResult, - DevicePairingResultStatus, - DeviceUnpairingResult, - DeviceUnpairingResultStatus, - DevicePairingKinds, - DevicePairingProtectionLevel, - DeviceInformationCustomPairing, - DevicePairingRequestedEventArgs, -) -from Windows.Devices.Bluetooth import ( - BluetoothLEDevice, - BluetoothConnectionStatus, - BluetoothCacheMode, - BluetoothAddressType, -) -from Windows.Devices.Bluetooth.GenericAttributeProfile import ( - GattDeviceServicesResult, - GattCharacteristic, - GattCharacteristicsResult, - GattDescriptorsResult, - GattCommunicationStatus, - GattReadResult, - GattWriteOption, - GattWriteResult, - GattValueChangedEventArgs, - GattCharacteristicProperties, - GattClientCharacteristicConfigurationDescriptorValue, - GattSession, -) - -logger = logging.getLogger(__name__) - -_communication_statues = { - getattr(GattCommunicationStatus, k): k - for k in ["Success", "Unreachable", "ProtocolError", "AccessDenied"] -} - -_pairing_statuses = { - getattr(DevicePairingResultStatus, v): v - for v in dir(DevicePairingResultStatus) - if "_" not in v and isinstance(getattr(DevicePairingResultStatus, v), int) -} - -_unpairing_statuses = { - getattr(DeviceUnpairingResultStatus, v): v - for v in dir(DeviceUnpairingResultStatus) - if "_" not in v and isinstance(getattr(DeviceUnpairingResultStatus, v), int) -} - - -class BleakClientDotNet(BaseBleakClient): - """The native Windows Bleak Client. - - Implemented using `pythonnet `_, a package that provides an integration to the .NET - Common Language Runtime (CLR). Therefore, much of the code below has a distinct C# feel. - - Args: - address_or_ble_device (`BLEDevice` or str): The Bluetooth address of the BLE peripheral to connect to or the `BLEDevice` object representing it. - - Keyword Args: - use_cached (bool): If set to `True`, then the OS level BLE cache is used for - getting services, characteristics and descriptors. Defaults to ``True``. - timeout (float): Timeout for required ``BleakScanner.find_device_by_address`` call. Defaults to 10.0. - - """ - - def __init__(self, address_or_ble_device: Union[BLEDevice, str], **kwargs): - super(BleakClientDotNet, self).__init__(address_or_ble_device, **kwargs) - - # Backend specific. Python.NET objects. - if isinstance(address_or_ble_device, BLEDevice): - self._device_info = address_or_ble_device.details.BluetoothAddress - else: - self._device_info = None - self._requester = None - self._connect_events: List[asyncio.Event] = [] - self._disconnect_events: List[asyncio.Event] = [] - self._connection_status_changed_token: EventRegistrationToken = None - self._session: GattSession = None - - self._address_type = ( - kwargs["address_type"] - if "address_type" in kwargs - and kwargs["address_type"] in ("public", "random") - else None - ) - self._use_cached = kwargs.get("use_cached", True) - - def __str__(self): - return "BleakClientDotNet ({0})".format(self.address) - - # Connectivity methods - - async def connect(self, **kwargs) -> bool: - """Connect to the specified GATT server. - - Keyword Args: - timeout (float): Timeout for required ``BleakScanner.find_device_by_address`` call. Defaults to 10.0. - use_cached (bool): If set to `True`, then the OS level BLE cache is used for - getting services, characteristics and descriptors. Defaults to ``True``. - - Returns: - Boolean representing connection status. - - Raises: - BleakError: When device is not found. - TimeoutError: When connecting to the device takes too long. - """ - # Try to find the desired device. - timeout = kwargs.get("timeout", self._timeout) - use_cached = kwargs.get("use_cached", self._use_cached) - if self._device_info is None: - device = await BleakScannerDotNet.find_device_by_address( - self.address, timeout=timeout - ) - - if device: - self._device_info = device.details.BluetoothAddress - else: - raise BleakError( - "Device with address {0} was not found.".format(self.address) - ) - - logger.debug("Connecting to BLE device @ {0}".format(self.address)) - - args = [UInt64(self._device_info)] - if self._address_type is not None: - args.append( - BluetoothAddressType.Public - if self._address_type == "public" - else BluetoothAddressType.Random - ) - self._requester = await wrap_IAsyncOperation( - IAsyncOperation[BluetoothLEDevice]( - BluetoothLEDevice.FromBluetoothAddressAsync(*args) - ), - return_type=BluetoothLEDevice, - ) - - # Called on disconnect event or on failure to connect. - def handle_disconnect(): - if self._connection_status_changed_token: - self._requester.remove_ConnectionStatusChanged( - self._connection_status_changed_token - ) - self._connection_status_changed_token = None - - if self._requester: - self._requester.Dispose() - self._requester = None - - if self._session: - self._session.Dispose() - self._session = None - - def handle_connection_status_changed( - connection_status: BluetoothConnectionStatus, - ): - if connection_status == BluetoothConnectionStatus.Connected: - for e in self._connect_events: - e.set() - - elif connection_status == BluetoothConnectionStatus.Disconnected: - if self._disconnected_callback: - self._disconnected_callback(self) - - for e in self._disconnect_events: - e.set() - - handle_disconnect() - - loop = asyncio.get_event_loop() - - def _ConnectionStatusChanged_Handler(sender, args): - logger.debug( - "_ConnectionStatusChanged_Handler: %d", sender.ConnectionStatus - ) - loop.call_soon_threadsafe( - handle_connection_status_changed, sender.ConnectionStatus - ) - - self._connection_status_changed_token = ( - self._requester.add_ConnectionStatusChanged( - TypedEventHandler[BluetoothLEDevice, Object]( - _ConnectionStatusChanged_Handler - ) - ) - ) - - # Start a GATT Session to connect - event = asyncio.Event() - self._connect_events.append(event) - try: - self._session = await wrap_IAsyncOperation( - IAsyncOperation[GattSession]( - GattSession.FromDeviceIdAsync(self._requester.BluetoothDeviceId) - ), - return_type=GattSession, - ) - # This keeps the device connected until we dispose the session or - # until we set MaintainConnection = False. - self._session.MaintainConnection = True - await asyncio.wait_for(event.wait(), timeout=timeout) - except BaseException: - handle_disconnect() - raise - finally: - self._connect_events.remove(event) - - await self.get_services(use_cached=use_cached) - - return True - - async def disconnect(self) -> bool: - """Disconnect from the specified GATT server. - - Returns: - Boolean representing if device is disconnected. - - Raises: - asyncio.TimeoutError: If device did not disconnect with 10 seconds. - - """ - logger.debug("Disconnecting from BLE device...") - # Remove notifications. - for characteristic in self.services.characteristics.values(): - token = self._notification_callbacks.pop(characteristic.handle, None) - if token: - characteristic.obj.remove_ValueChanged(token) - - # Dispose all service components that we have requested and created. - for service in self.services: - service.obj.Dispose() - self.services = BleakGATTServiceCollection() - self._services_resolved = False - - # Without this, disposing the BluetoothLEDevice won't disconnect it - if self._session: - self._session.Dispose() - - # Dispose of the BluetoothLEDevice and see that the connection - # status is now Disconnected. - if self._requester: - event = asyncio.Event() - self._disconnect_events.append(event) - try: - self._requester.Dispose() - await asyncio.wait_for(event.wait(), timeout=10) - finally: - self._disconnect_events.remove(event) - - return True - - @property - def is_connected(self) -> bool: - """Check connection status between this client and the server. - - Returns: - Boolean representing connection status. - - """ - return self._DeprecatedIsConnectedReturn( - False - if self._requester is None - else self._requester.ConnectionStatus == BluetoothConnectionStatus.Connected - ) - - @property - def mtu_size(self) -> int: - """Get ATT MTU size for active connection""" - return self._session.MaxPduSize - - async def pair(self, protection_level=None, **kwargs) -> bool: - """Attempts to pair with the device. - - Keyword Args: - protection_level: - Windows.Devices.Enumeration.DevicePairingProtectionLevel - 1: None - Pair the device using no levels of protection. - 2: Encryption - Pair the device using encryption. - 3: EncryptionAndAuthentication - Pair the device using - encryption and authentication. (This will not work in Bleak...) - - Returns: - Boolean regarding success of pairing. - - """ - if ( - self._requester.DeviceInformation.Pairing.CanPair - and not self._requester.DeviceInformation.Pairing.IsPaired - ): - - # Currently only supporting Just Works solutions... - ceremony = DevicePairingKinds.ConfirmOnly - custom_pairing = self._requester.DeviceInformation.Pairing.Custom - - def handler(sender, args): - args.Accept() - - pairing_requested_token = custom_pairing.add_PairingRequested( - TypedEventHandler[ - DeviceInformationCustomPairing, DevicePairingRequestedEventArgs - ](handler) - ) - try: - if protection_level: - pairing_result = await wrap_IAsyncOperation( - IAsyncOperation[DevicePairingResult]( - custom_pairing.PairAsync.Overloads[ - DevicePairingKinds, DevicePairingProtectionLevel - ](ceremony, protection_level) - ), - return_type=DevicePairingResult, - ) - else: - pairing_result = await wrap_IAsyncOperation( - IAsyncOperation[DevicePairingResult]( - custom_pairing.PairAsync.Overloads[DevicePairingKinds]( - ceremony - ) - ), - return_type=DevicePairingResult, - ) - except Exception as e: - raise BleakError("Failure trying to pair with device!") from e - finally: - custom_pairing.remove_PairingRequested(pairing_requested_token) - - if pairing_result.Status not in ( - DevicePairingResultStatus.Paired, - DevicePairingResultStatus.AlreadyPaired, - ): - raise BleakError( - "Could not pair with device: {0}: {1}".format( - pairing_result.Status, - _pairing_statuses.get(pairing_result.Status), - ) - ) - else: - logger.info( - "Paired to device with protection level {0}.".format( - pairing_result.ProtectionLevelUsed - ) - ) - return True - else: - return self._requester.DeviceInformation.Pairing.IsPaired - - async def unpair(self) -> bool: - """Attempts to unpair from the device. - - N.B. unpairing also leads to disconnection in the Windows backend. - - Returns: - Boolean on whether the unparing was successful. - - """ - - if self._requester.DeviceInformation.Pairing.IsPaired: - unpairing_result = await wrap_IAsyncOperation( - IAsyncOperation[DeviceUnpairingResult]( - self._requester.DeviceInformation.Pairing.UnpairAsync() - ), - return_type=DeviceUnpairingResult, - ) - - if unpairing_result.Status not in ( - DevicePairingResultStatus.Paired, - DevicePairingResultStatus.AlreadyPaired, - ): - raise BleakError( - "Could not unpair with device: {0}: {1}".format( - unpairing_result.Status, - _unpairing_statuses.get(unpairing_result.Status), - ) - ) - else: - logger.info("Unpaired with device.") - return True - - return not self._requester.DeviceInformation.Pairing.IsPaired - - # GATT services methods - - async def get_services(self, **kwargs) -> BleakGATTServiceCollection: - """Get all services registered for this GATT server. - - Keyword Args: - - use_cached (bool): If set to `True`, then the OS level BLE cache is used for - getting services, characteristics and descriptors. - - Returns: - A :py:class:`bleak.backends.service.BleakGATTServiceCollection` with this device's services tree. - - """ - use_cached = kwargs.get("use_cached", self._use_cached) - # Return the Service Collection. - if self._services_resolved: - return self.services - else: - logger.debug("Get Services...") - services_result = await wrap_IAsyncOperation( - IAsyncOperation[GattDeviceServicesResult]( - self._requester.GetGattServicesAsync( - BluetoothCacheMode.Cached - if use_cached - else BluetoothCacheMode.Uncached - ) - ), - return_type=GattDeviceServicesResult, - ) - - if services_result.Status != GattCommunicationStatus.Success: - if services_result.Status == GattCommunicationStatus.ProtocolError: - raise BleakDotNetTaskError( - "Could not get GATT services: {0} (Error: 0x{1:02X}: {2})".format( - _communication_statues.get(services_result.Status, ""), - services_result.ProtocolError, - CONTROLLER_ERROR_CODES.get( - services_result.ProtocolError, "Unknown" - ), - ) - ) - else: - raise BleakDotNetTaskError( - "Could not get GATT services: {0}".format( - _communication_statues.get(services_result.Status, "") - ) - ) - - for service in services_result.Services: - characteristics_result = await wrap_IAsyncOperation( - IAsyncOperation[GattCharacteristicsResult]( - service.GetCharacteristicsAsync( - BluetoothCacheMode.Cached - if use_cached - else BluetoothCacheMode.Uncached - ) - ), - return_type=GattCharacteristicsResult, - ) - self.services.add_service(BleakGATTServiceDotNet(service)) - if characteristics_result.Status != GattCommunicationStatus.Success: - if ( - characteristics_result.Status - == GattCommunicationStatus.ProtocolError - ): - raise BleakDotNetTaskError( - "Could not get GATT characteristics for {0}: {1} (Error: 0x{2:02X}: {3})".format( - service, - _communication_statues.get( - characteristics_result.Status, "" - ), - characteristics_result.ProtocolError, - CONTROLLER_ERROR_CODES.get( - characteristics_result.ProtocolError, "Unknown" - ), - ) - ) - else: - raise BleakDotNetTaskError( - "Could not get GATT characteristics for {0}: {1}".format( - service, - _communication_statues.get( - characteristics_result.Status, "" - ), - ) - ) - for characteristic in characteristics_result.Characteristics: - descriptors_result = await wrap_IAsyncOperation( - IAsyncOperation[GattDescriptorsResult]( - characteristic.GetDescriptorsAsync( - BluetoothCacheMode.Cached - if use_cached - else BluetoothCacheMode.Uncached - ) - ), - return_type=GattDescriptorsResult, - ) - self.services.add_characteristic( - BleakGATTCharacteristicDotNet(characteristic) - ) - if descriptors_result.Status != GattCommunicationStatus.Success: - if ( - characteristics_result.Status - == GattCommunicationStatus.ProtocolError - ): - raise BleakDotNetTaskError( - "Could not get GATT descriptors for {0}: {1} (Error: 0x{2:02X}: {3})".format( - service, - _communication_statues.get( - descriptors_result.Status, "" - ), - descriptors_result.ProtocolError, - CONTROLLER_ERROR_CODES.get( - descriptors_result.ProtocolError, "Unknown" - ), - ) - ) - else: - raise BleakDotNetTaskError( - "Could not get GATT descriptors for {0}: {1}".format( - characteristic, - _communication_statues.get( - descriptors_result.Status, "" - ), - ) - ) - for descriptor in list(descriptors_result.Descriptors): - self.services.add_descriptor( - BleakGATTDescriptorDotNet( - descriptor, - characteristic.Uuid.ToString(), - int(characteristic.AttributeHandle), - ) - ) - - logger.info("Services resolved for %s", str(self)) - self._services_resolved = True - return self.services - - # I/O methods - - async def read_gatt_char( - self, - char_specifier: Union[BleakGATTCharacteristic, int, str, uuid.UUID], - **kwargs, - ) -> bytearray: - """Perform read operation on the specified GATT characteristic. - - Args: - char_specifier (BleakGATTCharacteristic, int, str or UUID): The characteristic to read from, - specified by either integer handle, UUID or directly by the - BleakGATTCharacteristic object representing it. - - Keyword Args: - use_cached (bool): ``False`` forces Windows to read the value from the - device again and not use its own cached value. Defaults to ``False``. - - Returns: - (bytearray) The read data. - - """ - - use_cached = kwargs.get("use_cached", False) - - if not isinstance(char_specifier, BleakGATTCharacteristic): - characteristic = self.services.get_characteristic(char_specifier) - else: - characteristic = char_specifier - if not characteristic: - raise BleakError("Characteristic {0} was not found!".format(char_specifier)) - - read_result = await wrap_IAsyncOperation( - IAsyncOperation[GattReadResult]( - characteristic.obj.ReadValueAsync( - BluetoothCacheMode.Cached - if use_cached - else BluetoothCacheMode.Uncached - ) - ), - return_type=GattReadResult, - ) - if read_result.Status == GattCommunicationStatus.Success: - with BleakDataReader(read_result.Value) as reader: - value = bytearray(reader.read()) - logger.debug( - "Read Characteristic {0} : {1}".format(characteristic.uuid, value) - ) - else: - if read_result.Status == GattCommunicationStatus.ProtocolError: - raise BleakDotNetTaskError( - "Could not get GATT characteristics for {0}: {1} (Error: 0x{2:02X}: {3})".format( - characteristic.uuid, - _communication_statues.get(read_result.Status, ""), - read_result.ProtocolError, - CONTROLLER_ERROR_CODES.get( - read_result.ProtocolError, "Unknown" - ), - ) - ) - else: - raise BleakError( - "Could not read characteristic value for {0}: {1}".format( - characteristic.uuid, - _communication_statues.get(read_result.Status, ""), - ) - ) - return value - - async def read_gatt_descriptor(self, handle: int, **kwargs) -> bytearray: - """Perform read operation on the specified GATT descriptor. - - Args: - handle (int): The handle of the descriptor to read from. - - Keyword Args: - use_cached (bool): ``False`` forces Windows to read the value from the - device again and not use its own cached value. Defaults to ``False``. - - Returns: - (bytearray) The read data. - - """ - use_cached = kwargs.get("use_cached", False) - - descriptor = self.services.get_descriptor(handle) - if not descriptor: - raise BleakError("Descriptor with handle {0} was not found!".format(handle)) - - read_result = await wrap_IAsyncOperation( - IAsyncOperation[GattReadResult]( - descriptor.obj.ReadValueAsync( - BluetoothCacheMode.Cached - if use_cached - else BluetoothCacheMode.Uncached - ) - ), - return_type=GattReadResult, - ) - if read_result.Status == GattCommunicationStatus.Success: - with BleakDataReader(read_result.Value) as reader: - value = bytearray(reader.read()) - logger.debug("Read Descriptor {0} : {1}".format(handle, value)) - else: - if read_result.Status == GattCommunicationStatus.ProtocolError: - raise BleakDotNetTaskError( - "Could not get GATT characteristics for {0}: {1} (Error: 0x{2:02X}: {3})".format( - descriptor.uuid, - _communication_statues.get(read_result.Status, ""), - read_result.ProtocolError, - CONTROLLER_ERROR_CODES.get( - read_result.ProtocolError, "Unknown" - ), - ) - ) - else: - raise BleakError( - "Could not read Descriptor value for {0}: {1}".format( - descriptor.uuid, - _communication_statues.get(read_result.Status, ""), - ) - ) - - return value - - async def write_gatt_char( - self, - char_specifier: Union[BleakGATTCharacteristic, int, str, uuid.UUID], - data: Union[bytes, bytearray, memoryview], - response: bool = False, - ) -> None: - """Perform a write operation of the specified GATT characteristic. - - Args: - char_specifier (BleakGATTCharacteristic, int, str or UUID): The characteristic to write - to, specified by either integer handle, UUID or directly by the - BleakGATTCharacteristic object representing it. - data (bytes or bytearray): The data to send. - response (bool): If write-with-response operation should be done. Defaults to `False`. - - """ - if not isinstance(char_specifier, BleakGATTCharacteristic): - characteristic = self.services.get_characteristic(char_specifier) - else: - characteristic = char_specifier - if not characteristic: - raise BleakError("Characteristic {} was not found!".format(char_specifier)) - - with BleakDataWriter(data) as writer: - response = ( - GattWriteOption.WriteWithResponse - if response - else GattWriteOption.WriteWithoutResponse - ) - write_result = await wrap_IAsyncOperation( - IAsyncOperation[GattWriteResult]( - characteristic.obj.WriteValueWithResultAsync( - writer.detach_buffer(), response - ) - ), - return_type=GattWriteResult, - ) - - if write_result.Status == GattCommunicationStatus.Success: - logger.debug( - "Write Characteristic {0} : {1}".format(characteristic.uuid, data) - ) - else: - if write_result.Status == GattCommunicationStatus.ProtocolError: - raise BleakError( - "Could not write value {0} to characteristic {1}: {2} (Error: 0x{3:02X}: {4})".format( - data, - characteristic.uuid, - _communication_statues.get(write_result.Status, ""), - write_result.ProtocolError, - CONTROLLER_ERROR_CODES.get( - write_result.ProtocolError, "Unknown" - ), - ) - ) - else: - raise BleakError( - "Could not write value {0} to characteristic {1}: {2}".format( - data, - characteristic.uuid, - _communication_statues.get(write_result.Status, ""), - ) - ) - - async def write_gatt_descriptor( - self, handle: int, data: Union[bytes, bytearray, memoryview] - ) -> None: - """Perform a write operation on the specified GATT descriptor. - - Args: - handle (int): The handle of the descriptor to read from. - data (bytes or bytearray): The data to send. - - """ - descriptor = self.services.get_descriptor(handle) - if not descriptor: - raise BleakError("Descriptor with handle {0} was not found!".format(handle)) - - with BleakDataWriter(data) as writer: - write_result = await wrap_IAsyncOperation( - IAsyncOperation[GattWriteResult]( - descriptor.obj.WriteValueWithResultAsync(writer.detach_buffer()) - ), - return_type=GattWriteResult, - ) - - if write_result.Status == GattCommunicationStatus.Success: - logger.debug("Write Descriptor {0} : {1}".format(handle, data)) - else: - if write_result.Status == GattCommunicationStatus.ProtocolError: - raise BleakError( - "Could not write value {0} to characteristic {1}: {2} (Error: 0x{3:02X}: {4})".format( - data, - descriptor.uuid, - _communication_statues.get(write_result.Status, ""), - write_result.ProtocolError, - CONTROLLER_ERROR_CODES.get( - write_result.ProtocolError, "Unknown" - ), - ) - ) - else: - raise BleakError( - "Could not write value {0} to descriptor {1}: {2}".format( - data, - descriptor.uuid, - _communication_statues.get(write_result.Status, ""), - ) - ) - - async def start_notify( - self, - char_specifier: Union[BleakGATTCharacteristic, int, str, uuid.UUID], - callback: Callable[[int, bytearray], None], - **kwargs, - ) -> None: - """Activate notifications/indications on a characteristic. - - Callbacks must accept two inputs. The first will be a integer handle of the characteristic generating the - data and the second will be a ``bytearray`` containing the data sent from the connected server. - - .. code-block:: python - - def callback(sender: int, data: bytearray): - print(f"{sender}: {data}") - client.start_notify(char_uuid, callback) - - Args: - char_specifier (BleakGATTCharacteristic, int, str or UUID): The characteristic to activate - notifications/indications on a characteristic, specified by either integer handle, - UUID or directly by the BleakGATTCharacteristic object representing it. - callback (function): The function to be called on notification. - - """ - if inspect.iscoroutinefunction(callback): - - def bleak_callback(s, d): - asyncio.ensure_future(callback(s, d)) - - else: - bleak_callback = callback - - if not isinstance(char_specifier, BleakGATTCharacteristic): - characteristic = self.services.get_characteristic(char_specifier) - else: - characteristic = char_specifier - if not characteristic: - raise BleakError("Characteristic {0} not found!".format(char_specifier)) - - if characteristic.handle in self._notification_callbacks: - await self.stop_notify(characteristic) - - characteristic_obj = characteristic.obj - if ( - characteristic_obj.CharacteristicProperties - & GattCharacteristicProperties.Indicate - ): - cccd = GattClientCharacteristicConfigurationDescriptorValue.Indicate - elif ( - characteristic_obj.CharacteristicProperties - & GattCharacteristicProperties.Notify - ): - cccd = GattClientCharacteristicConfigurationDescriptorValue.Notify - else: - cccd = getattr(GattClientCharacteristicConfigurationDescriptorValue, "None") - - self._notification_callbacks[ - characteristic.handle - ] = characteristic_obj.add_ValueChanged( - TypedEventHandler[GattCharacteristic, GattValueChangedEventArgs]( - _notification_wrapper(bleak_callback, asyncio.get_event_loop()) - ) - ) - - status = await wrap_IAsyncOperation( - IAsyncOperation[GattCommunicationStatus]( - characteristic_obj.WriteClientCharacteristicConfigurationDescriptorAsync( - cccd - ) - ), - return_type=GattCommunicationStatus, - ) - - if status != GattCommunicationStatus.Success: - # This usually happens when a device reports that it support indicate, - # but it actually doesn't. - characteristic_obj.remove_ValueChanged( - self._notification_callbacks.pop(characteristic.handle) - ) - # TODO: Find out how to get the ProtocolError code that describes a potential GattCommunicationStatus.ProtocolError result. - raise BleakError( - "Could not start notify on {0}: {1}".format( - characteristic.uuid, _communication_statues.get(status, "") - ) - ) - - async def stop_notify( - self, char_specifier: Union[BleakGATTCharacteristic, int, str, uuid.UUID] - ) -> None: - """Deactivate notification/indication on a specified characteristic. - - Args: - char_specifier (BleakGATTCharacteristic, int, str or UUID): The characteristic to deactivate - notification/indication on, specified by either integer handle, UUID or - directly by the BleakGATTCharacteristic object representing it. - - """ - if not isinstance(char_specifier, BleakGATTCharacteristic): - characteristic = self.services.get_characteristic(char_specifier) - else: - characteristic = char_specifier - if not characteristic: - raise BleakError("Characteristic {} not found!".format(char_specifier)) - - status = await wrap_IAsyncOperation( - IAsyncOperation[GattCommunicationStatus]( - characteristic.obj.WriteClientCharacteristicConfigurationDescriptorAsync( - getattr( - GattClientCharacteristicConfigurationDescriptorValue, "None" - ) - ) - ), - return_type=GattCommunicationStatus, - ) - - if status != GattCommunicationStatus.Success: - raise BleakError( - "Could not stop notify on {0}: {1}".format( - characteristic.uuid, _communication_statues.get(status, "") - ) - ) - - characteristic.obj.remove_ValueChanged( - self._notification_callbacks.pop(characteristic.handle) - ) - - -def _notification_wrapper(func: Callable, loop: asyncio.AbstractEventLoop): - @wraps(func) - def dotnet_notification_parser(sender: Any, args: Any): - # Return only the UUID string representation as sender. - # Also do a conversion from System.Bytes[] to bytearray. - with BleakDataReader(args.CharacteristicValue) as reader: - output = reader.read() - - return loop.call_soon_threadsafe( - func, sender.AttributeHandle, bytearray(output) - ) - - return dotnet_notification_parser diff --git a/bleak/backends/dotnet/descriptor.py b/bleak/backends/dotnet/descriptor.py deleted file mode 100644 index 50955f03..00000000 --- a/bleak/backends/dotnet/descriptor.py +++ /dev/null @@ -1,39 +0,0 @@ -# -*- coding: utf-8 -*- -from bleak.backends.descriptor import BleakGATTDescriptor - -# Import of BleakBridge to enable loading of winrt bindings -from BleakBridge import Bridge # noqa: F401 - -from Windows.Devices.Bluetooth.GenericAttributeProfile import GattDescriptor - - -class BleakGATTDescriptorDotNet(BleakGATTDescriptor): - """GATT Descriptor implementation for .NET backend""" - - def __init__( - self, obj: GattDescriptor, characteristic_uuid: str, characteristic_handle: int - ): - super(BleakGATTDescriptorDotNet, self).__init__(obj) - self.obj = obj - self.__characteristic_uuid = characteristic_uuid - self.__characteristic_handle = characteristic_handle - - @property - def characteristic_handle(self) -> int: - """handle for the characteristic that this descriptor belongs to""" - return self.__characteristic_handle - - @property - def characteristic_uuid(self) -> str: - """UUID for the characteristic that this descriptor belongs to""" - return self.__characteristic_uuid - - @property - def uuid(self) -> str: - """UUID for this descriptor""" - return self.obj.Uuid.ToString() - - @property - def handle(self) -> int: - """Integer handle for this descriptor""" - return self.obj.AttributeHandle diff --git a/bleak/backends/dotnet/scanner.py b/bleak/backends/dotnet/scanner.py deleted file mode 100644 index 393d465d..00000000 --- a/bleak/backends/dotnet/scanner.py +++ /dev/null @@ -1,268 +0,0 @@ -import logging -import asyncio -import pathlib -from typing import List -from uuid import UUID - -from bleak.backends.device import BLEDevice -from bleak.backends.dotnet.utils import BleakDataReader -from bleak.backends.scanner import BaseBleakScanner, AdvertisementData - -# Import of BleakBridge to enable loading of winrt bindings -from BleakBridge import Bridge # noqa: F401 - -from Windows.Devices.Bluetooth.Advertisement import ( - BluetoothLEAdvertisementWatcher, - BluetoothLEScanningMode, - BluetoothLEAdvertisementType, - BluetoothLEAdvertisementReceivedEventArgs, - BluetoothLEAdvertisementWatcherStoppedEventArgs, -) -from Windows.Foundation import TypedEventHandler - -logger = logging.getLogger(__name__) -_here = pathlib.Path(__file__).parent - - -def _format_bdaddr(a): - return ":".join("{:02X}".format(x) for x in a.to_bytes(6, byteorder="big")) - - -def _format_event_args(e): - try: - return "{0}: {1}".format( - _format_bdaddr(e.BluetoothAddress), e.Advertisement.LocalName or "Unknown" - ) - except Exception: - return e.BluetoothAddress - - -class BleakScannerDotNet(BaseBleakScanner): - """The native Windows Bleak BLE Scanner. - - Implemented using `pythonnet `_, a package that provides an integration to - the .NET Common Language Runtime (CLR). Therefore, much of the code below has a distinct C# feel. - - Keyword Args: - - scanning mode (str): Set to ``Passive`` to avoid the ``Active`` scanning mode. - - SignalStrengthFilter (``Windows.Devices.Bluetooth.BluetoothSignalStrengthFilter``): A - BluetoothSignalStrengthFilter object used for configuration of Bluetooth LE advertisement - filtering that uses signal strength-based filtering. - - AdvertisementFilter (``Windows.Devices.Bluetooth.Advertisement.BluetoothLEAdvertisementFilter``): A - BluetoothLEAdvertisementFilter object used for configuration of Bluetooth LE advertisement - filtering that uses payload section-based filtering. - - """ - - def __init__(self, **kwargs): - super(BleakScannerDotNet, self).__init__(**kwargs) - - self.watcher = None - self._stopped_event = None - self._devices = {} - self._scan_responses = {} - - self._received_token = None - self._stopped_token = None - - if "scanning_mode" in kwargs and kwargs["scanning_mode"].lower() == "passive": - self._scanning_mode = BluetoothLEScanningMode.Passive - else: - self._scanning_mode = BluetoothLEScanningMode.Active - - self._signal_strength_filter = None - self._advertisement_filter = None - self.set_scanning_filter(**kwargs) - - def _received_handler( - self, - sender: BluetoothLEAdvertisementWatcher, - event_args: BluetoothLEAdvertisementReceivedEventArgs, - ): - if sender == self.watcher: - logger.debug("Received {0}.".format(_format_event_args(event_args))) - if ( - event_args.AdvertisementType - == BluetoothLEAdvertisementType.ScanResponse - ): - if event_args.BluetoothAddress not in self._scan_responses: - self._scan_responses[event_args.BluetoothAddress] = event_args - else: - if event_args.BluetoothAddress not in self._devices: - self._devices[event_args.BluetoothAddress] = event_args - - if self._callback is None: - return - - # Get a "BLEDevice" from parse_event args - device = self._parse_event_args(event_args) - - # Decode service data - service_data = {} - # 0x16 is service data with 16-bit UUID - for section in event_args.Advertisement.GetSectionsByType(0x16): - with BleakDataReader(section.Data) as reader: - data = reader.read() - service_data[ - f"0000{data[1]:02x}{data[0]:02x}-0000-1000-8000-00805f9b34fb" - ] = data[2:] - # 0x20 is service data with 32-bit UUID - for section in event_args.Advertisement.GetSectionsByType(0x20): - with BleakDataReader(section.Data) as reader: - data = reader.read() - service_data[ - f"{data[3]:02x}{data[2]:02x}{data[1]:02x}{data[0]:02x}-0000-1000-8000-00805f9b34fb" - ] = data[4:] - # 0x21 is service data with 128-bit UUID - for section in event_args.Advertisement.GetSectionsByType(0x21): - with BleakDataReader(section.Data) as reader: - data = reader.read() - service_data[str(UUID(bytes=data[15::-1]))] = data[16:] - - # Use the BLEDevice to populate all the fields for the advertisement data to return - advertisement_data = AdvertisementData( - local_name=event_args.Advertisement.LocalName, - manufacturer_data=device.metadata["manufacturer_data"], - service_data=service_data, - service_uuids=device.metadata["uuids"], - platform_data=(sender, event_args), - ) - - self._callback(device, advertisement_data) - - def _stopped_handler( - self, - sender: BluetoothLEAdvertisementWatcher, - e: BluetoothLEAdvertisementWatcherStoppedEventArgs, - ): - if sender == self.watcher: - logger.debug( - "{0} devices found. Watcher status: {1}.".format( - len(self._devices), self.watcher.Status - ) - ) - self._stopped_event.set() - - async def start(self): - self.watcher = BluetoothLEAdvertisementWatcher() - self.watcher.ScanningMode = self._scanning_mode - - event_loop = asyncio.get_event_loop() - self._stopped_event = asyncio.Event() - - self._received_token = self.watcher.add_Received( - TypedEventHandler[ - BluetoothLEAdvertisementWatcher, - BluetoothLEAdvertisementReceivedEventArgs, - ]( - lambda s, e: event_loop.call_soon_threadsafe( - self._received_handler, s, e - ) - ) - ) - self._stopped_token = self.watcher.add_Stopped( - TypedEventHandler[ - BluetoothLEAdvertisementWatcher, - BluetoothLEAdvertisementWatcherStoppedEventArgs, - ](lambda s, e: event_loop.call_soon_threadsafe(self._stopped_handler, s, e)) - ) - - if self._signal_strength_filter is not None: - self.watcher.SignalStrengthFilter = self._signal_strength_filter - if self._advertisement_filter is not None: - self.watcher.AdvertisementFilter = self._advertisement_filter - - self.watcher.Start() - - async def stop(self): - self.watcher.Stop() - await self._stopped_event.wait() - - if self._received_token: - self.watcher.remove_Received(self._received_token) - self._received_token = None - if self._stopped_token: - self.watcher.remove_Stopped(self._stopped_token) - self._stopped_token = None - - self.watcher = None - - def set_scanning_filter(self, **kwargs): - """Set a scanning filter for the BleakScanner. - - Keyword Args: - SignalStrengthFilter (``Windows.Devices.Bluetooth.BluetoothSignalStrengthFilter``): A - BluetoothSignalStrengthFilter object used for configuration of Bluetooth - LE advertisement filtering that uses signal strength-based filtering. - AdvertisementFilter (Windows.Devices.Bluetooth.Advertisement.BluetoothLEAdvertisementFilter): A - BluetoothLEAdvertisementFilter object used for configuration of Bluetooth LE - advertisement filtering that uses payload section-based filtering. - - """ - if "SignalStrengthFilter" in kwargs: - # TODO: Handle SignalStrengthFilter parameters - self._signal_strength_filter = kwargs["SignalStrengthFilter"] - if "AdvertisementFilter" in kwargs: - # TODO: Handle AdvertisementFilter parameters - self._advertisement_filter = kwargs["AdvertisementFilter"] - - @property - def discovered_devices(self) -> List[BLEDevice]: - found = [] - for event_args in list(self._devices.values()): - new_device = self._parse_event_args(event_args) - if ( - not new_device.name - and event_args.BluetoothAddress in self._scan_responses - ): - new_device.name = self._scan_responses[ - event_args.BluetoothAddress - ].Advertisement.LocalName - found.append(new_device) - - return found - - @staticmethod - def _parse_event_args(event_args): - bdaddr = _format_bdaddr(event_args.BluetoothAddress) - uuids = [] - for u in event_args.Advertisement.ServiceUuids: - uuids.append(u.ToString()) - data = {} - for m in event_args.Advertisement.ManufacturerData: - with BleakDataReader(m.Data) as reader: - data[m.CompanyId] = reader.read() - local_name = event_args.Advertisement.LocalName - rssi = event_args.RawSignalStrengthInDBm - return BLEDevice( - bdaddr, local_name, event_args, rssi, uuids=uuids, manufacturer_data=data - ) - - # Windows specific - - @property - def status(self) -> int: - """Get status of the Watcher. - - Returns: - - Aborted 4 - An error occurred during transition or scanning that stopped the watcher due to an error. - - Created 0 - The initial status of the watcher. - - Started 1 - The watcher is started. - - Stopped 3 - The watcher is stopped. - - Stopping 2 - The watcher stop command was issued. - - """ - return self.watcher.Status if self.watcher else None diff --git a/bleak/backends/dotnet/service.py b/bleak/backends/dotnet/service.py deleted file mode 100644 index 90a369c4..00000000 --- a/bleak/backends/dotnet/service.py +++ /dev/null @@ -1,39 +0,0 @@ -from typing import List - -from bleak.backends.service import BleakGATTService -from bleak.backends.dotnet.characteristic import BleakGATTCharacteristicDotNet - -# Import of BleakBridge to enable loading of winrt bindings -from BleakBridge import Bridge # noqa: F401 - -from Windows.Devices.Bluetooth.GenericAttributeProfile import GattDeviceService - - -class BleakGATTServiceDotNet(BleakGATTService): - """GATT Characteristic implementation for the .NET backend""" - - def __init__(self, obj: GattDeviceService): - super().__init__(obj) - self.__characteristics = [] - - @property - def handle(self) -> int: - """The handle of this service""" - return int(self.obj.AttributeHandle) - - @property - def uuid(self) -> str: - """UUID for this service.""" - return self.obj.Uuid.ToString() - - @property - def characteristics(self) -> List[BleakGATTCharacteristicDotNet]: - """List of characteristics for this service""" - return self.__characteristics - - def add_characteristic(self, characteristic: BleakGATTCharacteristicDotNet): - """Add a :py:class:`~BleakGATTCharacteristicDotNet` to the service. - - Should not be used by end user, but rather by `bleak` itself. - """ - self.__characteristics.append(characteristic) diff --git a/bleak/backends/dotnet/utils.py b/bleak/backends/dotnet/utils.py deleted file mode 100644 index 52aeb8ef..00000000 --- a/bleak/backends/dotnet/utils.py +++ /dev/null @@ -1,127 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Helper methods for awaiting on .NET Tasks. - -Created on 2017-12-05 by hbldh -""" - -import asyncio - -from bleak.exc import BleakDotNetTaskError - -# Import of BleakBridge to enable loading of winrt bindings -from BleakBridge import Bridge # noqa: F401 - -# Python for .NET CLR imports -from System import Action -from System.Threading.Tasks import Task - -from Windows.Foundation import ( - AsyncOperationCompletedHandler, - IAsyncOperation, - AsyncStatus, -) -from System import Array, Byte -from Windows.Storage.Streams import DataReader, DataWriter, IBuffer - - -async def wrap_Task(task): - """Enables await on .NET Task using asyncio.Event and a lambda callback. - - Args: - task (System.Threading.Tasks.Task): .NET async task object - to await upon. - - Returns: - The results of the the .NET Task. - - """ - loop = asyncio.get_event_loop() - done = asyncio.Event() - # Register Action callback that triggers the above asyncio.Event. - task.ContinueWith(Action[Task](lambda x: loop.call_soon_threadsafe(done.set))) - # Wait for callback. - await done.wait() - # TODO: Handle IsCancelled. - if task.IsFaulted: - # Exception occurred. Wrap it in BleakDotNetTaskError - # to make it catchable. - raise BleakDotNetTaskError(task.Exception.ToString()) - - return task.Result - - -async def wrap_IAsyncOperation(op: IAsyncOperation, return_type): - """Enables await on .NET Task using asyncio.Event and a lambda callback. - - Args: - op (Windows.Foundation.IAsyncOperation[TResult]): .NET async operation object to await. - result_type (TResult): The .NET type of the result of the async operation. - - Returns: - The results of the the .NET Task. - - """ - loop = asyncio.get_event_loop() - done = asyncio.Event() - # Register AsyncOperationCompletedHandler callback that triggers the above asyncio.Event. - op.Completed = AsyncOperationCompletedHandler[return_type]( - lambda x, y: loop.call_soon_threadsafe(done.set) - ) - # Wait for callback. - await done.wait() - - if op.Status == AsyncStatus.Completed: - return op.GetResults() - elif op.Status == AsyncStatus.Error: - # Exception occurred. Wrap it in BleakDotNetTaskError - # to make it catchable. - raise BleakDotNetTaskError(op.ErrorCode.ToString()) - else: - # TODO: Handle IsCancelled. - raise BleakDotNetTaskError("IAsyncOperation Status: {0}".format(op.Status)) - - -class BleakDataReader: - def __init__(self, buffer_com_object): - - self.reader = None - self.buffer = IBuffer(buffer_com_object) - - def __enter__(self): - self.reader = DataReader.FromBuffer(self.buffer) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.reader.DetachBuffer() - self.reader.Dispose() - self.reader = None - self.buffer = None - - def read(self) -> bytes: - b = Array.CreateInstance(Byte, self.reader.UnconsumedBufferLength) - self.reader.ReadBytes(b) - py_b = bytes(b) - del b - return py_b - - -class BleakDataWriter: - def __init__(self, data): - self.data = data - - def __enter__(self): - self.writer = DataWriter() - self.writer.WriteBytes(Array[Byte](self.data)) - return self - - def detach_buffer(self): - return self.writer.DetachBuffer() - - def __exit__(self, exc_type, exc_val, exc_tb): - try: - self.writer.Dispose() - except Exception: - pass - del self.writer - self.writer = None diff --git a/bleak/backends/scanner.py b/bleak/backends/scanner.py index 870a5626..aa48f3ed 100644 --- a/bleak/backends/scanner.py +++ b/bleak/backends/scanner.py @@ -199,6 +199,8 @@ async def find_device_by_address( The ``BLEDevice`` sought or ``None`` if not detected. """ + if device_identifier is None: + return None device_identifier = device_identifier.lower() return await cls.find_device_by_filter( lambda d, ad: d.address.lower() == device_identifier, diff --git a/bleak/backends/winrt/client.py b/bleak/backends/winrt/client.py index 539776e5..6feeb87f 100644 --- a/bleak/backends/winrt/client.py +++ b/bleak/backends/winrt/client.py @@ -10,19 +10,21 @@ import asyncio import uuid from functools import wraps -from typing import Callable, Any, List, Union +from typing import Callable, Any, List, Union, Optional from winrt.windows.devices.enumeration import ( + DeviceInformationCustomPairing, DevicePairingKinds, DevicePairingResultStatus, DeviceUnpairingResultStatus, + DevicePairingRequestedEventArgs, ) from winrt.windows.security.cryptography import CryptographicBuffer from bleak.backends.device import BLEDevice from bleak.backends.winrt.scanner import BleakScannerWinRT from bleak.exc import BleakError, BleakDotNetTaskError, CONTROLLER_ERROR_CODES -from bleak.backends.client import BaseBleakClient +from bleak.backends.client import BaseBleakClient, PairingCallback from bleak.backends.characteristic import BleakGATTCharacteristic from bleak.backends.service import BleakGATTServiceCollection @@ -30,9 +32,6 @@ from bleak.backends.winrt.characteristic import BleakGATTCharacteristicWinRT from bleak.backends.winrt.descriptor import BleakGATTDescriptorWinRT - -# Import of RT components needed. - from winrt.windows.devices.bluetooth import ( BluetoothLEDevice, BluetoothConnectionStatus, @@ -47,7 +46,6 @@ GattSession, ) - logger = logging.getLogger(__name__) _communication_statues = { @@ -58,14 +56,12 @@ ) } - _pairing_statuses = { getattr(DevicePairingResultStatus, v): v for v in dir(DevicePairingResultStatus) if "_" not in v and isinstance(getattr(DevicePairingResultStatus, v), int) } - _unpairing_statuses = { getattr(DeviceUnpairingResultStatus, v): v for v in dir(DeviceUnpairingResultStatus) @@ -216,6 +212,7 @@ def _ConnectionStatusChanged_Handler(sender, args): # This keeps the device connected until we dispose the session or # until we set maintain_connection = False. self._session.maintain_connection = True + await asyncio.wait_for(event.wait(), timeout=timeout) except BaseException: handle_disconnect() @@ -285,7 +282,13 @@ def mtu_size(self) -> int: """Get ATT MTU size for active connection""" return self._session.max_pdu_size - async def pair(self, protection_level: int = None, **kwargs) -> bool: + async def pair( + self, + protection_level=None, + *args, + callback: Optional[PairingCallback] = None, + **kwargs, + ) -> bool: """Attempts to pair with the device. Keyword Args: @@ -295,6 +298,11 @@ async def pair(self, protection_level: int = None, **kwargs) -> bool: 2: Encryption - Pair the device using encryption. 3: EncryptionAndAuthentication - Pair the device using encryption and authentication. (This will not work in Bleak...) + callback (`PairingCallback`): callback to be called to provide or confirm pairing pin + or passkey. If not provided and Bleak is registered as a pairing agent/manager + instead of system pairing manager, then all display- and confirm-based pairing + requests will be accepted, and requests requiring pin or passkey input will be + canceled. Returns: Boolean regarding success of pairing. @@ -306,12 +314,40 @@ async def pair(self, protection_level: int = None, **kwargs) -> bool: and not self._requester.device_information.pairing.is_paired ): - # Currently only supporting Just Works solutions... - ceremony = DevicePairingKinds.CONFIRM_ONLY + if callback: + ceremony = ( + DevicePairingKinds.CONFIRM_ONLY + + DevicePairingKinds.CONFIRM_PIN_MATCH + + DevicePairingKinds.DISPLAY_PIN + + DevicePairingKinds.PROVIDE_PIN + ) + else: + ceremony = DevicePairingKinds.CONFIRM_ONLY custom_pairing = self._requester.device_information.pairing.custom - def handler(sender, args): - args.accept() + def handler( + sender: DeviceInformationCustomPairing, + args: DevicePairingRequestedEventArgs, + ): + deferral = args.get_deferral() + if callback: + if args.pairing_kind == DevicePairingKinds.CONFIRM_ONLY: + args.accept() + # TODO: Get device MAC for first argument, test conversion, flags can have multiple values set (mask) + elif ( + args.pairing_kind == DevicePairingKinds.CONFIRM_PIN_MATCH + or args.pairing_kind == DevicePairingKinds.DISPLAY_PIN + ): + if callback("", args.Pin, None) is True: + args.accept() + elif args.pairing_kind == DevicePairingKinds.PROVIDE_PIN: + pin = callback("", None, None) + if pin: + args.accept(pin) + else: + args.accept() + + deferral.complete() pairing_requested_token = custom_pairing.add_pairing_requested(handler) try: @@ -345,6 +381,9 @@ def handler(sender, args): ) return True else: + logger.debug( + f"Device ({self._requester.device_information}) is already paired." + ) return self._requester.device_information.pairing.is_paired async def unpair(self) -> bool: @@ -506,7 +545,7 @@ async def get_services(self, **kwargs) -> BleakGATTServiceCollection: async def read_gatt_char( self, char_specifier: Union[BleakGATTCharacteristic, int, str, uuid.UUID], - **kwargs + **kwargs, ) -> bytearray: """Perform read operation on the specified GATT characteristic. @@ -715,7 +754,7 @@ async def start_notify( self, char_specifier: Union[BleakGATTCharacteristic, int, str, uuid.UUID], callback: Callable[[int, bytearray], None], - **kwargs + **kwargs, ) -> None: """Activate notifications/indications on a characteristic. diff --git a/examples/passkey_pairing.py b/examples/passkey_pairing.py new file mode 100644 index 00000000..e52af779 --- /dev/null +++ b/examples/passkey_pairing.py @@ -0,0 +1,70 @@ +""" +Services +---------------- + +An example showing how to pair using Passkey Entry pairing method using pre-shared passkey + +Created on 2021-04-20 by Bojan Potočnik + +""" + +import asyncio +import os +import platform +from typing import Union + +# os.environ["BLEAK_LOGGING"] = "1" + +from bleak import BleakClient + + +def get_passkey( + device: str, pin: Union[None, str], passkey: Union[None, int] +) -> Union[bool, int, str, None]: + if pin: + print(f"Device {device} is displaying pin '{pin}'") + return True + + if passkey: + print(f"Device {device} is displaying passkey '{passkey:06d}'") + return True + + # Retrieve passkey using custom algorithm, web API or just ask the user like OS pairing + # wizard would do + psk = input( + f"Provide pin (1-16 characters) or passkey (0-999999) for {device}, or nothing to reject " + f"pairing: " + ) + + # Return None if psk is empty string (pincode 0 is valid pin, but "0" is True) + return psk or None + + +async def main(mac_addr: str): + # Remove this device if it is already paired (from previous runs) + if await BleakClient.remove_device(mac_addr): + print(f"Device {mac_addr} was unpaired") + + # Pairing agent shall be registered before initiating the connection + async with BleakClient(mac_addr, handle_pairing=True) as client: + print("Pairing...") + print(await client.pair(callback=get_passkey)) + print("Paired") + + services = await client.get_services() + print(services) + for service in services: + print(service) + for char in service.characteristics: + print("\t", char) + print("\t\tValue: ", await client.read_gatt_char(char)) + + +if platform.system() == "Darwin": + raise EnvironmentError( + "Pairing methods other than Just Works are currently implemented only on BlueZ, .NET, and WinRT backend." + ) + +loop = asyncio.get_event_loop() +loop.set_debug(True) +loop.run_until_complete(main("24:71:89:cc:09:05")) diff --git a/setup.py b/setup.py index 32612012..024ca54d 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,6 @@ 'pyobjc-framework-CoreBluetooth;platform_system=="Darwin"', 'pyobjc-framework-libdispatch;platform_system=="Darwin"', # Windows reqs - 'pythonnet;platform_system=="Windows" and python_version < "3.9.0"', 'winrt>=1.0.21033.1;platform_system=="Windows" and python_version >= "3.9.0"', ] @@ -87,7 +86,6 @@ def run(self): author_email=EMAIL, url=URL, packages=find_packages(exclude=("tests", "examples", "docs", "BleakUWPBridge")), - package_data={"bleak.backends.dotnet": ["*.dll"]}, entry_points={"console_scripts": ["bleak-lescan=bleak:cli"]}, install_requires=REQUIRED, test_suite="tests", @@ -96,9 +94,6 @@ def run(self): "winrt": [ "winrt>=1.0.21033.1", ], - "pythonnet": [ - "pythonnet>=2.5.1", - ], }, include_package_data=True, license="MIT",