Skip to content

Commit

Permalink
Add support for BPUP (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Feb 5, 2021
1 parent 1e5b6f9 commit 1247f5c
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 10 deletions.
2 changes: 1 addition & 1 deletion bond_api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Asynchronous Python wrapper library over Bond Local API."""

from .bond import Bond # noqa: F401
from .bond import Bond, BPUPSubscriptions, start_bpup # noqa: F401
from .action import Action, Direction # noqa: F401
from .device_type import DeviceType # noqa: F401
117 changes: 108 additions & 9 deletions bond_api/bond.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
"""Bond Local API wrapper."""

from typing import List, Optional, Callable, Any
import asyncio
import json
import time
from asyncio import transports
from typing import Any, Callable, List, Optional

from aiohttp import ClientSession, ClientTimeout

from .action import Action

BPUP_INIT_PUSH_MESSAGE = b"\n"
BPUP_PORT = 30007
BPUP_ALIVE_TIMEOUT = 70


class Bond:
"""Bond API."""

def __init__(self, host: str, token: str, *,
session: Optional[ClientSession] = None,
timeout: Optional[ClientTimeout] = None):
def __init__(
self,
host: str,
token: str,
*,
session: Optional[ClientSession] = None,
timeout: Optional[ClientTimeout] = None,
):
"""Initialize Bond with provided host and token."""
self._host = host
self._api_kwargs = {"headers": {"BOND-Token": token}}
Expand All @@ -27,7 +40,7 @@ async def version(self) -> dict:
async def devices(self) -> List[str]:
"""Return the list of available device IDs reported by API."""
json = await self.__get("/v2/devices")
return [key for key in json if key != '_']
return [key for key in json if key != "_"]

async def device(self, device_id: str) -> dict:
"""Return main device metadata reported by API."""
Expand All @@ -47,17 +60,17 @@ async def action(self, device_id: str, action: Action) -> None:

async def put(session: ClientSession) -> None:
async with session.put(
f"http://{self._host}{path}",
**self._api_kwargs,
json=action.argument
f"http://{self._host}{path}", **self._api_kwargs, json=action.argument
) as response:
response.raise_for_status()

await self.__call(put)

async def __get(self, path) -> dict:
async def get(session: ClientSession) -> dict:
async with session.get(f"http://{self._host}{path}", **self._api_kwargs) as response:
async with session.get(
f"http://{self._host}{path}", **self._api_kwargs
) as response:
response.raise_for_status()
return await response.json()

Expand All @@ -69,3 +82,89 @@ async def __call(self, handler: Callable[[ClientSession], Any]):
return await handler(request_session)
else:
return await handler(self._session)


class BPUPSubscriptions:
"""Store BPUP subscriptions."""

def __init__(self):
"""Init and store callbacks."""
self._callbacks = {}
self.last_message_time = 0

@property
def alive(self):
return (time.time() - self.last_message_time) < BPUP_ALIVE_TIMEOUT

def subscribe(self, device_id, callback):
"""Subscribe to BPUP updates."""
self._callbacks.setdefault(device_id, []).append(callback)

def unsubscribe(self, device_id, callback):
"""Unsubscribe from BPUP updates."""
self._callbacks[device_id].remove(callback)

def notify(self, json_msg):
"""Notify subscribers of an update."""
self.last_message_time = time.time()

if json_msg.get("s") != 200:
return

topic = json_msg["t"].split("/")
device_id = topic[1]

for callback in self._callbacks.get(device_id, []):
callback(json_msg["b"])


class BPUProtocol:
"""Implements BPU Protocol."""

def __init__(self, loop, bpup_subscriptions):
"""Create BPU Protocol."""
self.loop = loop
self.bpup_subscriptions = bpup_subscriptions
self.transport = None
self.keep_alive = None

def connection_made(self, transport):
"""Connect or reconnect to the device."""
self.transport = transport
if self.keep_alive:
self.keep_alive.cancel()
self.keep_alive = None
self.send_keep_alive()

def send_keep_alive(self):
"""Send a keep alive every 60 seconds per the protocol."""
self.transport.sendto(BPUP_INIT_PUSH_MESSAGE)
self.keep_alive = self.loop.call_later(60, self.send_keep_alive)

def datagram_received(self, data, addr):
"""Process incoming state changes."""
self.bpup_subscriptions.notify(json.loads(data.decode()[:-1]))

def error_received(self, exc):
"""Ignore errors."""
return

def connection_lost(self, exc):
"""Ignore connection lost."""
return

def stop(self):
"""Stop the client."""
if self.transport:
self.transport.close()


async def start_bpup(host_ip_addr, bpup_subscriptions):
"""Create the socket and protocol."""
loop = asyncio.get_event_loop()

_, protocol = await loop.create_datagram_endpoint(
lambda: BPUProtocol(loop, bpup_subscriptions),
remote_addr=(host_ip_addr, BPUP_PORT),
)
return protocol.stop
24 changes: 24 additions & 0 deletions bpup_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import asyncio


from bond_api import BPUPSubscriptions, start_bpup


async def main(ip_address):
"""Example of library usage."""

sub = BPUPSubscriptions()
stop_bpup = await start_bpup(ip_address, sub)

for i in range(500):
print("BPUP is alive:", sub.alive)
await asyncio.sleep(1)

stop_bpup()


if __name__ == "__main__":
print("Enter the device ip:")
ip_address = input().strip()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(ip_address))

0 comments on commit 1247f5c

Please sign in to comment.