From dbcd807ab5485231a30eab08a76286d80d9936c1 Mon Sep 17 00:00:00 2001 From: Tim Camise Date: Mon, 29 Jul 2024 13:58:57 -0700 Subject: [PATCH 1/2] Add in adv parsing structs and demo --- .../open_gopro/demos/adv_parsing.py | 285 ++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 demos/python/sdk_wireless_camera_control/open_gopro/demos/adv_parsing.py diff --git a/demos/python/sdk_wireless_camera_control/open_gopro/demos/adv_parsing.py b/demos/python/sdk_wireless_camera_control/open_gopro/demos/adv_parsing.py new file mode 100644 index 00000000..81d39045 --- /dev/null +++ b/demos/python/sdk_wireless_camera_control/open_gopro/demos/adv_parsing.py @@ -0,0 +1,285 @@ +from __future__ import annotations +import asyncio +import json +from typing import Any +from dataclasses import dataclass, field, asdict + +from construct import Int16ub +from bleak import BleakScanner +from bleak.backends.scanner import AdvertisementData + +from construct import ( + GreedyString, + Struct, + Byte, + Int16ub, + Bytes, + BitStruct, + Flag, + Padding, + Enum, + this, + Hex, + Int16ul, + Adapter, +) + + +class Hexlify(Adapter): + def _decode(self, obj: bytes, context, path): + return obj.hex(":") + + def _encode(self, obj: str, context, path): + return list(map(int, obj.split(":"))) + + +example_adv_data = bytes( + [ + 0x02, + 0x01, + 0x02, + 0x03, + 0x02, + 0xA6, + 0xFE, + 0x0F, + 0xFF, + 0xF2, + 0x02, + 0x02, + 0x01, + 0x38, + 0x33, + 0x00, + 0xB3, + 0xFE, + 0x2A, + 0x79, + 0xDC, + 0xEB, + 0x0F, + ] +) + +camera_status_struct = BitStruct( + "processor_state" / Flag, + "wifi_ap_state" / Flag, + "peripheral_pairing_state" / Flag, + "central_role_enabled" / Flag, + "is_new_media_available" / Flag, + "reserved" / Padding(3), +) + +camera_id_struct = Enum( + Byte, + Hero11Black=56, + Fraction=66, +) + +camera_capability_struct = BitStruct( + "cnc" / Flag, + "ble_metadata" / Flag, + "wideband_audio" / Flag, + "concurrent_master_slave" / Flag, + "onboarding" / Flag, + "new_media_available" / Flag, + "reserved" / Padding(10), +) + +media_offload_status_struct = BitStruct( + "available" / Flag, + "new_media_available" / Flag, + "battery_ok" / Flag, + "sd_card_ok" / Flag, + "busy" / Flag, + "paused" / Flag, + "reserved" / Padding(2), +) + +manuf_data_struct = Struct( + "schema_version" / Byte, + "camera_status" / camera_status_struct, + "camera_id" / camera_id_struct, + "camera_capabilities" / camera_capability_struct, + "id_hash" / Hexlify(Bytes(6)), + "media_offload_status" / media_offload_status_struct, +) + +adv_data_struct = Struct( + "flags_length" / Byte, + "flags" / Hex(Int16ub), + "uuids_length" / Byte, + "uuids_type" / Hex(Byte), + "uuids" / Hex(Int16ul), + "manuf_length" / Byte, + "manuf_type" / Hex(Byte), + "company_id" / Hex(Int16ub), + "manuf_data" / manuf_data_struct, +) + +example_scan_response_data = bytes( + [ + 0x0B, + 0x09, + 0x47, + 0x6F, + 0x50, + 0x72, + 0x6F, + 0x20, + 0x31, + 0x30, + 0x35, + 0x38, + 0x0B, + 0x16, + 0xA6, + 0xFE, + 0xF7, + 0xA9, + 0x76, + 0x88, + 0x31, + 0x30, + 0x35, + 0x38, + ] +) + +service_data_struct = Struct( + "ap_mac_address" / Hexlify(Bytes(4)), + "serial_number" / GreedyString("utf8"), +) + +scan_response_struct = Struct( + "name_length" / Byte, + "name_type" / Hex(Byte), + "name" / Bytes(this.name_length - 1), + "service_length" / Byte, + "service_type" / Hex(Byte), + "service_uuid" / Hex(Int16ul), + "service_data" / service_data_struct, +) + + +@dataclass +class Jsonable: + def __str__(self) -> str: + def default_decode(obj: Any) -> Any: + if isinstance(obj, (bytes, bytearray)): + return obj.hex(":") + return str(obj) + + return json.dumps(asdict(self), indent=4, default=default_decode) + + +@dataclass +class GoProAdvData(Jsonable): + name: str + schema_version: int + processor_state: bool + wifi_ap_state: bool + peripheral_pairing_state: bool + is_new_media_available: bool + camera_id: str + supports_cnc: bool + supports_ble_metadata: bool + supports_wideband_audio: bool + supports_concurrent_master_slave: bool + supports_onboarding: bool + supports_new_media_available: bool + id_hash: bytes + is_media_upload_new_media_available: bool + is_media_upload_available: bool + is_media_upload_battery_ok: bool + is_media_upload_sd_card_ok: bool + is_media_upload_busy: bool + is_media_upload_paused: bool + ap_mac_address: bytes + partial_serial_number: bytes + + @classmethod + def fromAdvData(cls, data: AdvData) -> GoProAdvData: + manuf_data = manuf_data_struct.parse(list(data.manufacturer_data.values())[0]) + service_data = service_data_struct.parse(list(data.service_data.values())[0]) + return GoProAdvData( + # Name from scan response data + name=data.local_name, + # Schema version from advertising data manufacturer data + schema_version=manuf_data.schema_version, + # Camera status from advertising data manufacturer data + processor_state=manuf_data.camera_status.processor_state, + wifi_ap_state=manuf_data.camera_status.wifi_ap_state, + peripheral_pairing_state=manuf_data.camera_status.peripheral_pairing_state, + is_new_media_available=manuf_data.camera_status.is_new_media_available, + # Camera ID from advertising data manufacturer data + camera_id=manuf_data.camera_id, + # Camera capabilities from advertising data manufacturer data + supports_ble_metadata=manuf_data.camera_capabilities.ble_metadata, + supports_cnc=manuf_data.camera_capabilities.cnc, + supports_onboarding=manuf_data.camera_capabilities.onboarding, + supports_wideband_audio=manuf_data.camera_capabilities.wideband_audio, + supports_concurrent_master_slave=manuf_data.camera_capabilities.concurrent_master_slave, + supports_new_media_available=manuf_data.camera_capabilities.new_media_available, + # ID Hash from advertising data manufacturer's data + id_hash=manuf_data.id_hash, + # Media offload status status from advertising data manufacturer's data + is_media_upload_new_media_available=manuf_data.media_offload_status.new_media_available, + is_media_upload_available=manuf_data.media_offload_status.available, + is_media_upload_battery_ok=manuf_data.media_offload_status.battery_ok, + is_media_upload_sd_card_ok=manuf_data.media_offload_status.sd_card_ok, + is_media_upload_busy=manuf_data.media_offload_status.busy, + is_media_upload_paused=manuf_data.media_offload_status.paused, + # Mac address from scan response data service data + ap_mac_address=service_data.ap_mac_address, + # Partial serial number from scan response data service data + partial_serial_number=service_data.serial_number, + ) + + +@dataclass +class AdvData(Jsonable): + local_name: str = "" + manufacturer_data: dict[str, Any] = field(default_factory=dict) + service_uuids: list[str] = field(default_factory=list) + service_data: dict = field(default_factory=dict) + + @staticmethod + def deeply_update_dict(d: dict, u: dict) -> dict: + for k, v in u.items(): + if isinstance(v, dict): + d[k] = AdvData.deeply_update_dict(d.get(k, {}), v) + else: + d[k] = v + return d + + def update(self, data: AdvertisementData) -> None: + self_dict = asdict(self) + for k, v in data._asdict().items(): + if not v: + continue + if isinstance(v, dict): + self.__setattr__(k, self.deeply_update_dict(self_dict[k], v)) + elif isinstance(v, list): + self.__setattr__(k, [*self_dict[k], v]) + else: + self.__setattr__(k, v) + + +async def main(): + # print(adv_data_struct.parse(example_adv_data)) + # print(scan_response_struct.parse(example_scan_response_data)) + + adv_data = AdvData() + + async with BleakScanner(service_uuids=["0000fea6-0000-1000-8000-00805f9b34fb"]) as scanner: + async for _, data in scanner.advertisement_data(): + adv_data.update(data) + if adv_data.local_name: # Once we've received the scan response... + break + + print(f"GoPro Data: {GoProAdvData.fromAdvData(adv_data)}") + + +if __name__ == "__main__": + asyncio.run(main()) From bf2786d39c72d015dc804fd4494d5f1292da0a23 Mon Sep 17 00:00:00 2001 From: Tim Camise Date: Fri, 8 Nov 2024 10:28:56 -0800 Subject: [PATCH 2/2] Refactor and add unit testing --- .../open_gopro/demos/adv_parsing.py | 270 +----------------- .../open_gopro/models/ble_advertisement.py | 227 +++++++++++++++ .../open_gopro/util.py | 18 ++ .../tests/unit/test_parsers.py | 114 +++++++- 4 files changed, 362 insertions(+), 267 deletions(-) create mode 100644 demos/python/sdk_wireless_camera_control/open_gopro/models/ble_advertisement.py diff --git a/demos/python/sdk_wireless_camera_control/open_gopro/demos/adv_parsing.py b/demos/python/sdk_wireless_camera_control/open_gopro/demos/adv_parsing.py index 81d39045..9ae9a14f 100644 --- a/demos/python/sdk_wireless_camera_control/open_gopro/demos/adv_parsing.py +++ b/demos/python/sdk_wireless_camera_control/open_gopro/demos/adv_parsing.py @@ -1,275 +1,13 @@ -from __future__ import annotations +"""Demo to retrieve and parse bleak-level advertisements""" + import asyncio -import json -from typing import Any -from dataclasses import dataclass, field, asdict -from construct import Int16ub from bleak import BleakScanner -from bleak.backends.scanner import AdvertisementData - -from construct import ( - GreedyString, - Struct, - Byte, - Int16ub, - Bytes, - BitStruct, - Flag, - Padding, - Enum, - this, - Hex, - Int16ul, - Adapter, -) - - -class Hexlify(Adapter): - def _decode(self, obj: bytes, context, path): - return obj.hex(":") - - def _encode(self, obj: str, context, path): - return list(map(int, obj.split(":"))) - - -example_adv_data = bytes( - [ - 0x02, - 0x01, - 0x02, - 0x03, - 0x02, - 0xA6, - 0xFE, - 0x0F, - 0xFF, - 0xF2, - 0x02, - 0x02, - 0x01, - 0x38, - 0x33, - 0x00, - 0xB3, - 0xFE, - 0x2A, - 0x79, - 0xDC, - 0xEB, - 0x0F, - ] -) - -camera_status_struct = BitStruct( - "processor_state" / Flag, - "wifi_ap_state" / Flag, - "peripheral_pairing_state" / Flag, - "central_role_enabled" / Flag, - "is_new_media_available" / Flag, - "reserved" / Padding(3), -) - -camera_id_struct = Enum( - Byte, - Hero11Black=56, - Fraction=66, -) - -camera_capability_struct = BitStruct( - "cnc" / Flag, - "ble_metadata" / Flag, - "wideband_audio" / Flag, - "concurrent_master_slave" / Flag, - "onboarding" / Flag, - "new_media_available" / Flag, - "reserved" / Padding(10), -) - -media_offload_status_struct = BitStruct( - "available" / Flag, - "new_media_available" / Flag, - "battery_ok" / Flag, - "sd_card_ok" / Flag, - "busy" / Flag, - "paused" / Flag, - "reserved" / Padding(2), -) - -manuf_data_struct = Struct( - "schema_version" / Byte, - "camera_status" / camera_status_struct, - "camera_id" / camera_id_struct, - "camera_capabilities" / camera_capability_struct, - "id_hash" / Hexlify(Bytes(6)), - "media_offload_status" / media_offload_status_struct, -) - -adv_data_struct = Struct( - "flags_length" / Byte, - "flags" / Hex(Int16ub), - "uuids_length" / Byte, - "uuids_type" / Hex(Byte), - "uuids" / Hex(Int16ul), - "manuf_length" / Byte, - "manuf_type" / Hex(Byte), - "company_id" / Hex(Int16ub), - "manuf_data" / manuf_data_struct, -) - -example_scan_response_data = bytes( - [ - 0x0B, - 0x09, - 0x47, - 0x6F, - 0x50, - 0x72, - 0x6F, - 0x20, - 0x31, - 0x30, - 0x35, - 0x38, - 0x0B, - 0x16, - 0xA6, - 0xFE, - 0xF7, - 0xA9, - 0x76, - 0x88, - 0x31, - 0x30, - 0x35, - 0x38, - ] -) - -service_data_struct = Struct( - "ap_mac_address" / Hexlify(Bytes(4)), - "serial_number" / GreedyString("utf8"), -) - -scan_response_struct = Struct( - "name_length" / Byte, - "name_type" / Hex(Byte), - "name" / Bytes(this.name_length - 1), - "service_length" / Byte, - "service_type" / Hex(Byte), - "service_uuid" / Hex(Int16ul), - "service_data" / service_data_struct, -) - - -@dataclass -class Jsonable: - def __str__(self) -> str: - def default_decode(obj: Any) -> Any: - if isinstance(obj, (bytes, bytearray)): - return obj.hex(":") - return str(obj) - - return json.dumps(asdict(self), indent=4, default=default_decode) - - -@dataclass -class GoProAdvData(Jsonable): - name: str - schema_version: int - processor_state: bool - wifi_ap_state: bool - peripheral_pairing_state: bool - is_new_media_available: bool - camera_id: str - supports_cnc: bool - supports_ble_metadata: bool - supports_wideband_audio: bool - supports_concurrent_master_slave: bool - supports_onboarding: bool - supports_new_media_available: bool - id_hash: bytes - is_media_upload_new_media_available: bool - is_media_upload_available: bool - is_media_upload_battery_ok: bool - is_media_upload_sd_card_ok: bool - is_media_upload_busy: bool - is_media_upload_paused: bool - ap_mac_address: bytes - partial_serial_number: bytes - - @classmethod - def fromAdvData(cls, data: AdvData) -> GoProAdvData: - manuf_data = manuf_data_struct.parse(list(data.manufacturer_data.values())[0]) - service_data = service_data_struct.parse(list(data.service_data.values())[0]) - return GoProAdvData( - # Name from scan response data - name=data.local_name, - # Schema version from advertising data manufacturer data - schema_version=manuf_data.schema_version, - # Camera status from advertising data manufacturer data - processor_state=manuf_data.camera_status.processor_state, - wifi_ap_state=manuf_data.camera_status.wifi_ap_state, - peripheral_pairing_state=manuf_data.camera_status.peripheral_pairing_state, - is_new_media_available=manuf_data.camera_status.is_new_media_available, - # Camera ID from advertising data manufacturer data - camera_id=manuf_data.camera_id, - # Camera capabilities from advertising data manufacturer data - supports_ble_metadata=manuf_data.camera_capabilities.ble_metadata, - supports_cnc=manuf_data.camera_capabilities.cnc, - supports_onboarding=manuf_data.camera_capabilities.onboarding, - supports_wideband_audio=manuf_data.camera_capabilities.wideband_audio, - supports_concurrent_master_slave=manuf_data.camera_capabilities.concurrent_master_slave, - supports_new_media_available=manuf_data.camera_capabilities.new_media_available, - # ID Hash from advertising data manufacturer's data - id_hash=manuf_data.id_hash, - # Media offload status status from advertising data manufacturer's data - is_media_upload_new_media_available=manuf_data.media_offload_status.new_media_available, - is_media_upload_available=manuf_data.media_offload_status.available, - is_media_upload_battery_ok=manuf_data.media_offload_status.battery_ok, - is_media_upload_sd_card_ok=manuf_data.media_offload_status.sd_card_ok, - is_media_upload_busy=manuf_data.media_offload_status.busy, - is_media_upload_paused=manuf_data.media_offload_status.paused, - # Mac address from scan response data service data - ap_mac_address=service_data.ap_mac_address, - # Partial serial number from scan response data service data - partial_serial_number=service_data.serial_number, - ) - - -@dataclass -class AdvData(Jsonable): - local_name: str = "" - manufacturer_data: dict[str, Any] = field(default_factory=dict) - service_uuids: list[str] = field(default_factory=list) - service_data: dict = field(default_factory=dict) - - @staticmethod - def deeply_update_dict(d: dict, u: dict) -> dict: - for k, v in u.items(): - if isinstance(v, dict): - d[k] = AdvData.deeply_update_dict(d.get(k, {}), v) - else: - d[k] = v - return d - - def update(self, data: AdvertisementData) -> None: - self_dict = asdict(self) - for k, v in data._asdict().items(): - if not v: - continue - if isinstance(v, dict): - self.__setattr__(k, self.deeply_update_dict(self_dict[k], v)) - elif isinstance(v, list): - self.__setattr__(k, [*self_dict[k], v]) - else: - self.__setattr__(k, v) +from open_gopro.models.ble_advertisement import AdvData, GoProAdvData -async def main(): - # print(adv_data_struct.parse(example_adv_data)) - # print(scan_response_struct.parse(example_scan_response_data)) +async def main() -> None: adv_data = AdvData() async with BleakScanner(service_uuids=["0000fea6-0000-1000-8000-00805f9b34fb"]) as scanner: diff --git a/demos/python/sdk_wireless_camera_control/open_gopro/models/ble_advertisement.py b/demos/python/sdk_wireless_camera_control/open_gopro/models/ble_advertisement.py new file mode 100644 index 00000000..90578c7f --- /dev/null +++ b/demos/python/sdk_wireless_camera_control/open_gopro/models/ble_advertisement.py @@ -0,0 +1,227 @@ +"""GoPro specific advertisement entities and parsing structures""" + +from __future__ import annotations + +import json +from dataclasses import asdict, dataclass, field +from typing import Any + +from bleak.backends.scanner import AdvertisementData +from construct import ( + Adapter, + BitStruct, + Byte, + Bytes, + Enum, + Flag, + GreedyString, + Hex, + Int16ub, + Int16ul, + PaddedString, + Padding, + Struct, + this, +) + +from open_gopro.util import deeply_update_dict + + +class Hexlify(Adapter): + """Construct adapter for pretty hex representation""" + + def _decode(self, obj: bytes, context: Any, path: Any) -> str: + return obj.hex(":") + + def _encode(self, obj: str, context: Any, path: Any) -> list[int]: + return list(map(int, obj.split(":"))) + + +camera_status_struct = BitStruct( + "processor_state" / Flag, + "wifi_ap_state" / Flag, + "peripheral_pairing_state" / Flag, + "central_role_enabled" / Flag, + "is_new_media_available" / Flag, + "reserved" / Padding(3), +) + +camera_id_struct = Enum( + Byte, + Hero11Black=56, + Fraction=66, +) + + +camera_capability_struct = BitStruct( + "cnc" / Flag, + "ble_metadata" / Flag, + "wideband_audio" / Flag, + "concurrent_master_slave" / Flag, + "onboarding" / Flag, + "new_media_available" / Flag, + "reserved" / Padding(10), +) + +media_offload_status_struct = BitStruct( + "available" / Flag, + "new_media_available" / Flag, + "battery_ok" / Flag, + "sd_card_ok" / Flag, + "busy" / Flag, + "paused" / Flag, + "reserved" / Padding(2), +) + +manuf_data_struct = Struct( + "schema_version" / Byte, + "camera_status" / camera_status_struct, + "camera_id" / camera_id_struct, + "camera_capabilities" / camera_capability_struct, + "id_hash" / Hexlify(Bytes(6)), + "media_offload_status" / media_offload_status_struct, +) + +adv_data_struct = Struct( + "flags_length" / Byte, + "flags" / Hex(Int16ub), + "uuids_length" / Byte, + "uuids_type" / Hex(Byte), + "uuids" / Hex(Int16ul), + "manuf_length" / Byte, + "manuf_type" / Hex(Byte), + "company_id" / Hex(Int16ub), + "manuf_data" / manuf_data_struct, +) + +service_data_struct = Struct( + "ap_mac_address" / Hexlify(Bytes(4)), + "serial_number" / GreedyString("utf8"), +) + +scan_response_struct = Struct( + "name_length" / Byte, + "name_type" / Hex(Byte), + "name" / PaddedString(this.name_length - 1, encoding="utf8"), + "service_length" / Byte, + "service_type" / Hex(Byte), + "service_uuid" / Hex(Int16ul), + "service_data" / service_data_struct, +) + + +@dataclass +class Jsonable: + """Mixin to use pretty hex presentation for JSON decoding""" + + def __str__(self) -> str: + def default_decode(obj: Any) -> Any: + if isinstance(obj, (bytes, bytearray)): + return obj.hex(":") + return str(obj) + + return json.dumps(asdict(self), indent=4, default=default_decode) + + +@dataclass +class GoProAdvData(Jsonable): + """GoPro-specific advertising data""" + + name: str + schema_version: int + processor_state: bool + wifi_ap_state: bool + peripheral_pairing_state: bool + is_new_media_available: bool + camera_id: str + supports_cnc: bool + supports_ble_metadata: bool + supports_wideband_audio: bool + supports_concurrent_master_slave: bool + supports_onboarding: bool + supports_new_media_available: bool + id_hash: bytes + is_media_upload_new_media_available: bool + is_media_upload_available: bool + is_media_upload_battery_ok: bool + is_media_upload_sd_card_ok: bool + is_media_upload_busy: bool + is_media_upload_paused: bool + ap_mac_address: bytes + partial_serial_number: bytes + + @classmethod + def fromAdvData(cls, data: AdvData) -> GoProAdvData: + """Build GoPro specific advertisement from standard BLE advertisement data + + Args: + data (AdvData): standard BLE advertisement data + + Returns: + GoProAdvData: parsed GoPro specific advertising data + """ + manuf_data = manuf_data_struct.parse(list(data.manufacturer_data.values())[0]) + service_data = service_data_struct.parse(list(data.service_data.values())[0]) + return GoProAdvData( + # Name from scan response data + name=data.local_name, + # Schema version from advertising data manufacturer data + schema_version=manuf_data.schema_version, + # Camera status from advertising data manufacturer data + processor_state=manuf_data.camera_status.processor_state, + wifi_ap_state=manuf_data.camera_status.wifi_ap_state, + peripheral_pairing_state=manuf_data.camera_status.peripheral_pairing_state, + is_new_media_available=manuf_data.camera_status.is_new_media_available, + # Camera ID from advertising data manufacturer data + camera_id=manuf_data.camera_id, + # Camera capabilities from advertising data manufacturer data + supports_ble_metadata=manuf_data.camera_capabilities.ble_metadata, + supports_cnc=manuf_data.camera_capabilities.cnc, + supports_onboarding=manuf_data.camera_capabilities.onboarding, + supports_wideband_audio=manuf_data.camera_capabilities.wideband_audio, + supports_concurrent_master_slave=manuf_data.camera_capabilities.concurrent_master_slave, + supports_new_media_available=manuf_data.camera_capabilities.new_media_available, + # ID Hash from advertising data manufacturer's data + id_hash=manuf_data.id_hash, + # Media offload status status from advertising data manufacturer's data + is_media_upload_new_media_available=manuf_data.media_offload_status.new_media_available, + is_media_upload_available=manuf_data.media_offload_status.available, + is_media_upload_battery_ok=manuf_data.media_offload_status.battery_ok, + is_media_upload_sd_card_ok=manuf_data.media_offload_status.sd_card_ok, + is_media_upload_busy=manuf_data.media_offload_status.busy, + is_media_upload_paused=manuf_data.media_offload_status.paused, + # Mac address from scan response data service data + ap_mac_address=service_data.ap_mac_address, + # Partial serial number from scan response data service data + partial_serial_number=service_data.serial_number, + ) + + +@dataclass +class AdvData(Jsonable): + """Standard BLE advertising data + + Only contains fields that are currently used by GoPro + """ + + local_name: str = "" + manufacturer_data: dict[str, Any] = field(default_factory=dict) + service_uuids: list[str] = field(default_factory=list) + service_data: dict = field(default_factory=dict) + + def update(self, data: AdvertisementData) -> None: + """Update with a (potentially incomplete) advertisement + + Args: + data (AdvertisementData): advertisement to use for updating + """ + self_dict = asdict(self) + for k, v in data._asdict().items(): + if not v: + continue + if isinstance(v, dict): + setattr(self, k, deeply_update_dict(self_dict[k], v)) + elif isinstance(v, list): + setattr(self, k, [*self_dict[k], v]) + else: + setattr(self, k, v) diff --git a/demos/python/sdk_wireless_camera_control/open_gopro/util.py b/demos/python/sdk_wireless_camera_control/open_gopro/util.py index ecee2cb0..68ac01e0 100644 --- a/demos/python/sdk_wireless_camera_control/open_gopro/util.py +++ b/demos/python/sdk_wireless_camera_control/open_gopro/util.py @@ -330,3 +330,21 @@ def get_current_dst_aware_time() -> tuple[datetime, int, bool]: if is_dst: offset += 60 return (now, int(offset), is_dst) + + +def deeply_update_dict(d: dict, u: dict) -> dict: + """Recursively update a dict + + Args: + d (dict): original dict + u (dict): dict to apply updates from + + Returns: + dict: updated original dict + """ + for k, v in u.items(): + if isinstance(v, dict): + d[k] = deeply_update_dict(d.get(k, {}), v) + else: + d[k] = v + return d diff --git a/demos/python/sdk_wireless_camera_control/tests/unit/test_parsers.py b/demos/python/sdk_wireless_camera_control/tests/unit/test_parsers.py index 9d144e59..5e9a54fd 100644 --- a/demos/python/sdk_wireless_camera_control/tests/unit/test_parsers.py +++ b/demos/python/sdk_wireless_camera_control/tests/unit/test_parsers.py @@ -5,7 +5,8 @@ from open_gopro.api.parsers import ByteParserBuilders from open_gopro.communicator_interface import GoProBle from open_gopro.constants import CmdId -from open_gopro.models.response import BleRespBuilder, GlobalParsers +from open_gopro.models.ble_advertisement import adv_data_struct, scan_response_struct +from open_gopro.models.response import GlobalParsers from open_gopro.parser_interface import Parser from open_gopro.proto import EnumResultGeneric, ResponseGetApEntries @@ -32,3 +33,114 @@ def test_recursive_protobuf_proxying(): assert len(parsed.entries) == 2 assert parsed.entries[0].ssid == "one" assert parsed.entries[1].ssid == "two" + + +def test_ble_advertisement_parsing(): + # GIVEN + adv_data = bytes( + [ + 0x02, + 0x01, + 0x02, + 0x03, + 0x02, + 0xA6, + 0xFE, + 0x0F, + 0xFF, + 0xF2, + 0x02, + 0x02, + 0x01, + 0x38, + 0x33, + 0x00, + 0xB3, + 0xFE, + 0x2A, + 0x79, + 0xDC, + 0xEB, + 0x0F, + ] + ) + + # WHEN + adv = adv_data_struct.parse(adv_data) + manuf_data = adv.manuf_data + camera_status = manuf_data.camera_status + camera_capabilities = manuf_data.camera_capabilities + media_offload_status = manuf_data.media_offload_status + + # THEN + assert adv.flags == 0x0102 + assert adv.uuids == 0xFEA6 + assert adv.manuf_type == 0xFF + assert adv.company_id == 0xF202 + + assert manuf_data.schema_version == 2 + assert str(manuf_data.camera_id) == "Hero11Black" + assert manuf_data.id_hash == "b3:fe:2a:79:dc:eb" + + assert camera_status.processor_state == False + assert camera_status.wifi_ap_state == False + assert camera_status.peripheral_pairing_state == False + assert camera_status.central_role_enabled == False + assert camera_status.is_new_media_available == False + + assert camera_capabilities.cnc == False + assert camera_capabilities.ble_metadata == False + assert camera_capabilities.wideband_audio == True + assert camera_capabilities.concurrent_master_slave == True + assert camera_capabilities.onboarding == False + assert camera_capabilities.new_media_available == False + + assert media_offload_status.available == False + assert media_offload_status.new_media_available == False + assert media_offload_status.battery_ok == False + assert media_offload_status.sd_card_ok == False + assert media_offload_status.busy == True + assert media_offload_status.paused == True + + +def test_ble_scan_response_parsing(): + # GIVEN + scan_response_data = bytes( + [ + 0x0B, + 0x09, + 0x47, + 0x6F, + 0x50, + 0x72, + 0x6F, + 0x20, + 0x31, + 0x30, + 0x35, + 0x38, + 0x0B, + 0x16, + 0xA6, + 0xFE, + 0xF7, + 0xA9, + 0x76, + 0x88, + 0x31, + 0x30, + 0x35, + 0x38, + ] + ) + + # WHEN + scan_response = scan_response_struct.parse(scan_response_data) + print(scan_response) + + # THEN + assert scan_response.name == "GoPro 1058" + assert scan_response.service_type == 0x16 + assert scan_response.service_uuid == 0xFEA6 + assert scan_response.service_data.ap_mac_address == "f7:a9:76:88" + assert scan_response.service_data.serial_number == "1058"