diff --git a/demos/python/sdk_wireless_camera_control/open_gopro/demos/adv_parsing.py b/demos/python/sdk_wireless_camera_control/open_gopro/demos/adv_parsing.py new file mode 100644 index 00000000..9ae9a14f --- /dev/null +++ b/demos/python/sdk_wireless_camera_control/open_gopro/demos/adv_parsing.py @@ -0,0 +1,23 @@ +"""Demo to retrieve and parse bleak-level advertisements""" + +import asyncio + +from bleak import BleakScanner + +from open_gopro.models.ble_advertisement import AdvData, GoProAdvData + + +async def main() -> None: + adv_data = AdvData() + + async with BleakScanner(service_uuids=["0000fea6-0000-1000-8000-00805f9b34fb"]) as scanner: + async for _, data in scanner.advertisement_data(): + adv_data.update(data) + if adv_data.local_name: # Once we've received the scan response... + break + + print(f"GoPro Data: {GoProAdvData.fromAdvData(adv_data)}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/demos/python/sdk_wireless_camera_control/open_gopro/models/ble_advertisement.py b/demos/python/sdk_wireless_camera_control/open_gopro/models/ble_advertisement.py new file mode 100644 index 00000000..90578c7f --- /dev/null +++ b/demos/python/sdk_wireless_camera_control/open_gopro/models/ble_advertisement.py @@ -0,0 +1,227 @@ +"""GoPro specific advertisement entities and parsing structures""" + +from __future__ import annotations + +import json +from dataclasses import asdict, dataclass, field +from typing import Any + +from bleak.backends.scanner import AdvertisementData +from construct import ( + Adapter, + BitStruct, + Byte, + Bytes, + Enum, + Flag, + GreedyString, + Hex, + Int16ub, + Int16ul, + PaddedString, + Padding, + Struct, + this, +) + +from open_gopro.util import deeply_update_dict + + +class Hexlify(Adapter): + """Construct adapter for pretty hex representation""" + + def _decode(self, obj: bytes, context: Any, path: Any) -> str: + return obj.hex(":") + + def _encode(self, obj: str, context: Any, path: Any) -> list[int]: + return list(map(int, obj.split(":"))) + + +camera_status_struct = BitStruct( + "processor_state" / Flag, + "wifi_ap_state" / Flag, + "peripheral_pairing_state" / Flag, + "central_role_enabled" / Flag, + "is_new_media_available" / Flag, + "reserved" / Padding(3), +) + +camera_id_struct = Enum( + Byte, + Hero11Black=56, + Fraction=66, +) + + +camera_capability_struct = BitStruct( + "cnc" / Flag, + "ble_metadata" / Flag, + "wideband_audio" / Flag, + "concurrent_master_slave" / Flag, + "onboarding" / Flag, + "new_media_available" / Flag, + "reserved" / Padding(10), +) + +media_offload_status_struct = BitStruct( + "available" / Flag, + "new_media_available" / Flag, + "battery_ok" / Flag, + "sd_card_ok" / Flag, + "busy" / Flag, + "paused" / Flag, + "reserved" / Padding(2), +) + +manuf_data_struct = Struct( + "schema_version" / Byte, + "camera_status" / camera_status_struct, + "camera_id" / camera_id_struct, + "camera_capabilities" / camera_capability_struct, + "id_hash" / Hexlify(Bytes(6)), + "media_offload_status" / media_offload_status_struct, +) + +adv_data_struct = Struct( + "flags_length" / Byte, + "flags" / Hex(Int16ub), + "uuids_length" / Byte, + "uuids_type" / Hex(Byte), + "uuids" / Hex(Int16ul), + "manuf_length" / Byte, + "manuf_type" / Hex(Byte), + "company_id" / Hex(Int16ub), + "manuf_data" / manuf_data_struct, +) + +service_data_struct = Struct( + "ap_mac_address" / Hexlify(Bytes(4)), + "serial_number" / GreedyString("utf8"), +) + +scan_response_struct = Struct( + "name_length" / Byte, + "name_type" / Hex(Byte), + "name" / PaddedString(this.name_length - 1, encoding="utf8"), + "service_length" / Byte, + "service_type" / Hex(Byte), + "service_uuid" / Hex(Int16ul), + "service_data" / service_data_struct, +) + + +@dataclass +class Jsonable: + """Mixin to use pretty hex presentation for JSON decoding""" + + def __str__(self) -> str: + def default_decode(obj: Any) -> Any: + if isinstance(obj, (bytes, bytearray)): + return obj.hex(":") + return str(obj) + + return json.dumps(asdict(self), indent=4, default=default_decode) + + +@dataclass +class GoProAdvData(Jsonable): + """GoPro-specific advertising data""" + + name: str + schema_version: int + processor_state: bool + wifi_ap_state: bool + peripheral_pairing_state: bool + is_new_media_available: bool + camera_id: str + supports_cnc: bool + supports_ble_metadata: bool + supports_wideband_audio: bool + supports_concurrent_master_slave: bool + supports_onboarding: bool + supports_new_media_available: bool + id_hash: bytes + is_media_upload_new_media_available: bool + is_media_upload_available: bool + is_media_upload_battery_ok: bool + is_media_upload_sd_card_ok: bool + is_media_upload_busy: bool + is_media_upload_paused: bool + ap_mac_address: bytes + partial_serial_number: bytes + + @classmethod + def fromAdvData(cls, data: AdvData) -> GoProAdvData: + """Build GoPro specific advertisement from standard BLE advertisement data + + Args: + data (AdvData): standard BLE advertisement data + + Returns: + GoProAdvData: parsed GoPro specific advertising data + """ + manuf_data = manuf_data_struct.parse(list(data.manufacturer_data.values())[0]) + service_data = service_data_struct.parse(list(data.service_data.values())[0]) + return GoProAdvData( + # Name from scan response data + name=data.local_name, + # Schema version from advertising data manufacturer data + schema_version=manuf_data.schema_version, + # Camera status from advertising data manufacturer data + processor_state=manuf_data.camera_status.processor_state, + wifi_ap_state=manuf_data.camera_status.wifi_ap_state, + peripheral_pairing_state=manuf_data.camera_status.peripheral_pairing_state, + is_new_media_available=manuf_data.camera_status.is_new_media_available, + # Camera ID from advertising data manufacturer data + camera_id=manuf_data.camera_id, + # Camera capabilities from advertising data manufacturer data + supports_ble_metadata=manuf_data.camera_capabilities.ble_metadata, + supports_cnc=manuf_data.camera_capabilities.cnc, + supports_onboarding=manuf_data.camera_capabilities.onboarding, + supports_wideband_audio=manuf_data.camera_capabilities.wideband_audio, + supports_concurrent_master_slave=manuf_data.camera_capabilities.concurrent_master_slave, + supports_new_media_available=manuf_data.camera_capabilities.new_media_available, + # ID Hash from advertising data manufacturer's data + id_hash=manuf_data.id_hash, + # Media offload status status from advertising data manufacturer's data + is_media_upload_new_media_available=manuf_data.media_offload_status.new_media_available, + is_media_upload_available=manuf_data.media_offload_status.available, + is_media_upload_battery_ok=manuf_data.media_offload_status.battery_ok, + is_media_upload_sd_card_ok=manuf_data.media_offload_status.sd_card_ok, + is_media_upload_busy=manuf_data.media_offload_status.busy, + is_media_upload_paused=manuf_data.media_offload_status.paused, + # Mac address from scan response data service data + ap_mac_address=service_data.ap_mac_address, + # Partial serial number from scan response data service data + partial_serial_number=service_data.serial_number, + ) + + +@dataclass +class AdvData(Jsonable): + """Standard BLE advertising data + + Only contains fields that are currently used by GoPro + """ + + local_name: str = "" + manufacturer_data: dict[str, Any] = field(default_factory=dict) + service_uuids: list[str] = field(default_factory=list) + service_data: dict = field(default_factory=dict) + + def update(self, data: AdvertisementData) -> None: + """Update with a (potentially incomplete) advertisement + + Args: + data (AdvertisementData): advertisement to use for updating + """ + self_dict = asdict(self) + for k, v in data._asdict().items(): + if not v: + continue + if isinstance(v, dict): + setattr(self, k, deeply_update_dict(self_dict[k], v)) + elif isinstance(v, list): + setattr(self, k, [*self_dict[k], v]) + else: + setattr(self, k, v) diff --git a/demos/python/sdk_wireless_camera_control/open_gopro/util.py b/demos/python/sdk_wireless_camera_control/open_gopro/util.py index ecee2cb0..68ac01e0 100644 --- a/demos/python/sdk_wireless_camera_control/open_gopro/util.py +++ b/demos/python/sdk_wireless_camera_control/open_gopro/util.py @@ -330,3 +330,21 @@ def get_current_dst_aware_time() -> tuple[datetime, int, bool]: if is_dst: offset += 60 return (now, int(offset), is_dst) + + +def deeply_update_dict(d: dict, u: dict) -> dict: + """Recursively update a dict + + Args: + d (dict): original dict + u (dict): dict to apply updates from + + Returns: + dict: updated original dict + """ + for k, v in u.items(): + if isinstance(v, dict): + d[k] = deeply_update_dict(d.get(k, {}), v) + else: + d[k] = v + return d diff --git a/demos/python/sdk_wireless_camera_control/tests/unit/test_parsers.py b/demos/python/sdk_wireless_camera_control/tests/unit/test_parsers.py index 9d144e59..5e9a54fd 100644 --- a/demos/python/sdk_wireless_camera_control/tests/unit/test_parsers.py +++ b/demos/python/sdk_wireless_camera_control/tests/unit/test_parsers.py @@ -5,7 +5,8 @@ from open_gopro.api.parsers import ByteParserBuilders from open_gopro.communicator_interface import GoProBle from open_gopro.constants import CmdId -from open_gopro.models.response import BleRespBuilder, GlobalParsers +from open_gopro.models.ble_advertisement import adv_data_struct, scan_response_struct +from open_gopro.models.response import GlobalParsers from open_gopro.parser_interface import Parser from open_gopro.proto import EnumResultGeneric, ResponseGetApEntries @@ -32,3 +33,114 @@ def test_recursive_protobuf_proxying(): assert len(parsed.entries) == 2 assert parsed.entries[0].ssid == "one" assert parsed.entries[1].ssid == "two" + + +def test_ble_advertisement_parsing(): + # GIVEN + adv_data = bytes( + [ + 0x02, + 0x01, + 0x02, + 0x03, + 0x02, + 0xA6, + 0xFE, + 0x0F, + 0xFF, + 0xF2, + 0x02, + 0x02, + 0x01, + 0x38, + 0x33, + 0x00, + 0xB3, + 0xFE, + 0x2A, + 0x79, + 0xDC, + 0xEB, + 0x0F, + ] + ) + + # WHEN + adv = adv_data_struct.parse(adv_data) + manuf_data = adv.manuf_data + camera_status = manuf_data.camera_status + camera_capabilities = manuf_data.camera_capabilities + media_offload_status = manuf_data.media_offload_status + + # THEN + assert adv.flags == 0x0102 + assert adv.uuids == 0xFEA6 + assert adv.manuf_type == 0xFF + assert adv.company_id == 0xF202 + + assert manuf_data.schema_version == 2 + assert str(manuf_data.camera_id) == "Hero11Black" + assert manuf_data.id_hash == "b3:fe:2a:79:dc:eb" + + assert camera_status.processor_state == False + assert camera_status.wifi_ap_state == False + assert camera_status.peripheral_pairing_state == False + assert camera_status.central_role_enabled == False + assert camera_status.is_new_media_available == False + + assert camera_capabilities.cnc == False + assert camera_capabilities.ble_metadata == False + assert camera_capabilities.wideband_audio == True + assert camera_capabilities.concurrent_master_slave == True + assert camera_capabilities.onboarding == False + assert camera_capabilities.new_media_available == False + + assert media_offload_status.available == False + assert media_offload_status.new_media_available == False + assert media_offload_status.battery_ok == False + assert media_offload_status.sd_card_ok == False + assert media_offload_status.busy == True + assert media_offload_status.paused == True + + +def test_ble_scan_response_parsing(): + # GIVEN + scan_response_data = bytes( + [ + 0x0B, + 0x09, + 0x47, + 0x6F, + 0x50, + 0x72, + 0x6F, + 0x20, + 0x31, + 0x30, + 0x35, + 0x38, + 0x0B, + 0x16, + 0xA6, + 0xFE, + 0xF7, + 0xA9, + 0x76, + 0x88, + 0x31, + 0x30, + 0x35, + 0x38, + ] + ) + + # WHEN + scan_response = scan_response_struct.parse(scan_response_data) + print(scan_response) + + # THEN + assert scan_response.name == "GoPro 1058" + assert scan_response.service_type == 0x16 + assert scan_response.service_uuid == 0xFEA6 + assert scan_response.service_data.ap_mac_address == "f7:a9:76:88" + assert scan_response.service_data.serial_number == "1058"