From 1247f5c18ed521eabdbd1722db98892ff4f5151d Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 5 Feb 2021 08:28:48 -1000 Subject: [PATCH] Add support for BPUP (#4) --- bond_api/__init__.py | 2 +- bond_api/bond.py | 117 +++++++++++++++++++++++++++++++++++++++---- bpup_test.py | 24 +++++++++ 3 files changed, 133 insertions(+), 10 deletions(-) create mode 100644 bpup_test.py diff --git a/bond_api/__init__.py b/bond_api/__init__.py index 14ca96d..db71282 100644 --- a/bond_api/__init__.py +++ b/bond_api/__init__.py @@ -1,5 +1,5 @@ """Asynchronous Python wrapper library over Bond Local API.""" -from .bond import Bond # noqa: F401 +from .bond import Bond, BPUPSubscriptions, start_bpup # noqa: F401 from .action import Action, Direction # noqa: F401 from .device_type import DeviceType # noqa: F401 diff --git a/bond_api/bond.py b/bond_api/bond.py index a5b1901..f85bef6 100644 --- a/bond_api/bond.py +++ b/bond_api/bond.py @@ -1,18 +1,31 @@ """Bond Local API wrapper.""" -from typing import List, Optional, Callable, Any +import asyncio +import json +import time +from asyncio import transports +from typing import Any, Callable, List, Optional from aiohttp import ClientSession, ClientTimeout from .action import Action +BPUP_INIT_PUSH_MESSAGE = b"\n" +BPUP_PORT = 30007 +BPUP_ALIVE_TIMEOUT = 70 + class Bond: """Bond API.""" - def __init__(self, host: str, token: str, *, - session: Optional[ClientSession] = None, - timeout: Optional[ClientTimeout] = None): + def __init__( + self, + host: str, + token: str, + *, + session: Optional[ClientSession] = None, + timeout: Optional[ClientTimeout] = None, + ): """Initialize Bond with provided host and token.""" self._host = host self._api_kwargs = {"headers": {"BOND-Token": token}} @@ -27,7 +40,7 @@ async def version(self) -> dict: async def devices(self) -> List[str]: """Return the list of available device IDs reported by API.""" json = await self.__get("/v2/devices") - return [key for key in json if key != '_'] + return [key for key in json if key != "_"] async def device(self, device_id: str) -> dict: """Return main device metadata reported by API.""" @@ -47,9 +60,7 @@ async def action(self, device_id: str, action: Action) -> None: async def put(session: ClientSession) -> None: async with session.put( - f"http://{self._host}{path}", - **self._api_kwargs, - json=action.argument + f"http://{self._host}{path}", **self._api_kwargs, json=action.argument ) as response: response.raise_for_status() @@ -57,7 +68,9 @@ async def put(session: ClientSession) -> None: async def __get(self, path) -> dict: async def get(session: ClientSession) -> dict: - async with session.get(f"http://{self._host}{path}", **self._api_kwargs) as response: + async with session.get( + f"http://{self._host}{path}", **self._api_kwargs + ) as response: response.raise_for_status() return await response.json() @@ -69,3 +82,89 @@ async def __call(self, handler: Callable[[ClientSession], Any]): return await handler(request_session) else: return await handler(self._session) + + +class BPUPSubscriptions: + """Store BPUP subscriptions.""" + + def __init__(self): + """Init and store callbacks.""" + self._callbacks = {} + self.last_message_time = 0 + + @property + def alive(self): + return (time.time() - self.last_message_time) < BPUP_ALIVE_TIMEOUT + + def subscribe(self, device_id, callback): + """Subscribe to BPUP updates.""" + self._callbacks.setdefault(device_id, []).append(callback) + + def unsubscribe(self, device_id, callback): + """Unsubscribe from BPUP updates.""" + self._callbacks[device_id].remove(callback) + + def notify(self, json_msg): + """Notify subscribers of an update.""" + self.last_message_time = time.time() + + if json_msg.get("s") != 200: + return + + topic = json_msg["t"].split("/") + device_id = topic[1] + + for callback in self._callbacks.get(device_id, []): + callback(json_msg["b"]) + + +class BPUProtocol: + """Implements BPU Protocol.""" + + def __init__(self, loop, bpup_subscriptions): + """Create BPU Protocol.""" + self.loop = loop + self.bpup_subscriptions = bpup_subscriptions + self.transport = None + self.keep_alive = None + + def connection_made(self, transport): + """Connect or reconnect to the device.""" + self.transport = transport + if self.keep_alive: + self.keep_alive.cancel() + self.keep_alive = None + self.send_keep_alive() + + def send_keep_alive(self): + """Send a keep alive every 60 seconds per the protocol.""" + self.transport.sendto(BPUP_INIT_PUSH_MESSAGE) + self.keep_alive = self.loop.call_later(60, self.send_keep_alive) + + def datagram_received(self, data, addr): + """Process incoming state changes.""" + self.bpup_subscriptions.notify(json.loads(data.decode()[:-1])) + + def error_received(self, exc): + """Ignore errors.""" + return + + def connection_lost(self, exc): + """Ignore connection lost.""" + return + + def stop(self): + """Stop the client.""" + if self.transport: + self.transport.close() + + +async def start_bpup(host_ip_addr, bpup_subscriptions): + """Create the socket and protocol.""" + loop = asyncio.get_event_loop() + + _, protocol = await loop.create_datagram_endpoint( + lambda: BPUProtocol(loop, bpup_subscriptions), + remote_addr=(host_ip_addr, BPUP_PORT), + ) + return protocol.stop diff --git a/bpup_test.py b/bpup_test.py new file mode 100644 index 0000000..dd54be2 --- /dev/null +++ b/bpup_test.py @@ -0,0 +1,24 @@ +import asyncio + + +from bond_api import BPUPSubscriptions, start_bpup + + +async def main(ip_address): + """Example of library usage.""" + + sub = BPUPSubscriptions() + stop_bpup = await start_bpup(ip_address, sub) + + for i in range(500): + print("BPUP is alive:", sub.alive) + await asyncio.sleep(1) + + stop_bpup() + + +if __name__ == "__main__": + print("Enter the device ip:") + ip_address = input().strip() + loop = asyncio.get_event_loop() + loop.run_until_complete(main(ip_address))