Skip to content

Commit

Permalink
rework to replace aiohttp with fastapi
Browse files Browse the repository at this point in the history
  • Loading branch information
steersbob committed Dec 6, 2023
1 parent 130a9bd commit a2eea47
Show file tree
Hide file tree
Showing 20 changed files with 1,110 additions and 1,172 deletions.
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ ENV VENV=/app/.venv
ENV PATH="$VENV/bin:$PATH"

COPY --from=base /wheeley /wheeley
COPY ./parse_appenv.py ./parse_appenv.py
COPY ./entrypoint.sh ./entrypoint.sh

RUN <<EOF
set -ex
Expand All @@ -40,4 +42,4 @@ RUN <<EOF
rm -rf /wheeley
EOF

ENTRYPOINT ["python3", "-m", "brewblox_tilt"]
ENTRYPOINT ["bash", "./entrypoint.sh"]
67 changes: 0 additions & 67 deletions brewblox_tilt/__main__.py

This file was deleted.

49 changes: 49 additions & 0 deletions brewblox_tilt/app_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
from contextlib import AsyncExitStack, asynccontextmanager
from pprint import pformat

from fastapi import FastAPI

from . import broadcaster, mqtt, parser, scanner, utils

LOGGER = logging.getLogger(__name__)


def setup_logging(debug: bool):
level = logging.DEBUG if debug else logging.INFO
unimportant_level = logging.INFO if debug else logging.WARN
format = '%(asctime)s.%(msecs)03d [%(levelname).1s:%(name)s:%(lineno)d] %(message)s'
datefmt = '%Y/%m/%d %H:%M:%S'

logging.basicConfig(level=level, format=format, datefmt=datefmt)
logging.captureWarnings(True)

logging.getLogger('gmqtt').setLevel(unimportant_level)
logging.getLogger('httpx').setLevel(unimportant_level)
logging.getLogger('httpcore').setLevel(logging.WARN)
logging.getLogger('uvicorn.access').setLevel(unimportant_level)
logging.getLogger('uvicorn.error').disabled = True


@asynccontextmanager
async def lifespan(app: FastAPI):
LOGGER.info(utils.get_config())
LOGGER.debug('LOGGERS:\n' + pformat(logging.root.manager.loggerDict))

async with AsyncExitStack() as stack:
await stack.enter_async_context(mqtt.lifespan())
await stack.enter_async_context(broadcaster.lifespan())
yield


def create_app() -> FastAPI:
config = utils.get_config()
setup_logging(config.debug)

# Call setup functions for modules
mqtt.setup()
parser.setup()
scanner.setup()

app = FastAPI(lifespan=lifespan)
return app
201 changes: 68 additions & 133 deletions brewblox_tilt/broadcaster.py
Original file line number Diff line number Diff line change
@@ -1,98 +1,25 @@
import asyncio
import json
import time
from uuid import UUID
import logging
from contextlib import asynccontextmanager, suppress

from aiohttp import web
from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from brewblox_service import brewblox_logger, features, mqtt, repeater
from construct import Array, Byte, Const, Int8sl, Int16ub, Struct
from construct.core import ConstError
from . import mqtt, scanner, utils

from brewblox_tilt import const, parser
from brewblox_tilt.models import ServiceConfig
LOGGER = logging.getLogger(__name__)

APPLE_VID = 0x004C
BEACON_STRUCT = Struct(
'type_length' / Const(b'\x02\x15'),
'uuid' / Array(16, Byte),
'major' / Int16ub,
'minor' / Int16ub,
'tx_power' / Int8sl,
)


LOGGER = brewblox_logger(__name__)


def time_ms():
return time.time_ns() // 1000000


class Broadcaster(repeater.RepeaterFeature):
def __init__(self, app: web.Application):
super().__init__(app)

config: ServiceConfig = app['config']
class Broadcaster():
def __init__(self):
config = utils.get_config()
self.name = config.name
self.scan_duration = max(config.scan_duration, 1)
self.inactive_scan_interval = max(config.inactive_scan_interval, 0)
self.active_scan_interval = max(config.active_scan_interval, 0)
self.state_topic = f'{config.state_topic}/{self.name}'
self.history_topic = f'{config.history_topic}/{self.name}'
self.names_topic = f'brewcast/tilt/{self.name}/names'

self.scanner = BleakScanner(self.device_callback)
self.parser = parser.EventDataParser(app)
self.scan_interval = 1
self.prev_num_messages = 0
self.events: dict[str, parser.TiltEvent] = {}

def device_callback(self, device: BLEDevice, advertisement_data: AdvertisementData):
try:
mac = device.address
apple_data = advertisement_data.manufacturer_data[APPLE_VID]
packet = BEACON_STRUCT.parse(apple_data)
uuid = str(UUID(bytes=bytes(packet.uuid)))

if uuid not in const.TILT_UUID_COLORS.keys():
return

LOGGER.debug(f'Recv {mac=} {uuid=}, {packet.major=}, {packet.minor=}')
self.events[mac] = parser.TiltEvent(mac=mac,
uuid=uuid,
major=packet.major,
minor=packet.minor,
txpower=packet.tx_power,
rssi=advertisement_data.rssi)

except KeyError:
pass # Apple vendor ID not found
except ConstError:
pass # Not an iBeacon

async def on_names_change(self, topic: str, payload: str):
self.parser.apply_custom_names(json.loads(payload))

async def prepare(self):
await mqtt.listen(self.app, self.names_topic, self.on_names_change)
await mqtt.subscribe(self.app, self.names_topic)

async def shutdown(self, app: web.Application):
await mqtt.unsubscribe(app, self.names_topic)
await mqtt.unlisten(app, self.names_topic, self.on_names_change)

async def run(self):
await asyncio.sleep(self.scan_interval)

async with self.scanner:
await asyncio.sleep(self.scan_duration)

messages = self.parser.parse(list(self.events.values()))
self.events.clear()
self.state_topic = f'brewcast/state/{self.name}'
self.history_topic = f'brewcast/history/{self.name}'

async def _run(self):
mqtt_client = mqtt.CV.get()
messages = await scanner.CV.get().scan(self.scan_duration)
curr_num_messages = len(messages)
prev_num_messages = self.prev_num_messages
self.prev_num_messages = curr_num_messages
Expand All @@ -105,15 +32,13 @@ async def run(self):

# Always broadcast a presence message
# This will make the service show up in the UI even without active Tilts
await mqtt.publish(self.app,
self.state_topic,
json.dumps({
'key': self.name,
'type': 'Tilt.state.service',
'timestamp': time_ms(),
}),
err=False,
retain=True)
mqtt_client.publish(self.state_topic,
{
'key': self.name,
'type': 'Tilt.state.service',
'timestamp': utils.time_ms(),
},
retain=True)

if not messages:
return
Expand All @@ -122,50 +47,60 @@ async def run(self):

# Publish history
# Devices can share an event
await mqtt.publish(self.app,
self.history_topic,
json.dumps({
'key': self.name,
'data': {
msg.name: msg.data
for msg in messages
},
}),
err=False)
mqtt_client.publish(self.history_topic,
{
'key': self.name,
'data': {
msg.name: msg.data
for msg in messages
},
})

# Publish state
# Publish individual devices separately
# This lets us retain last published value if a device stops publishing
timestamp = time_ms()
timestamp = utils.time_ms()
for msg in messages:
await mqtt.publish(self.app,
f'{self.state_topic}/{msg.color}/{msg.mac}',
json.dumps({
'key': self.name,
'type': 'Tilt.state',
'timestamp': timestamp,
'color': msg.color,
'mac': msg.mac,
'name': msg.name,
'data': msg.data,
}),
err=False,
retain=True)
mqtt_client.publish(f'{self.state_topic}/{msg.color}/{msg.mac}',
{
'key': self.name,
'type': 'Tilt.state',
'timestamp': timestamp,
'color': msg.color,
'mac': msg.mac,
'name': msg.name,
'data': msg.data,
},
retain=True)

for sync in msg.sync:
if sync.type == 'TempSensorExternal':
await mqtt.publish(self.app,
'brewcast/spark/blocks/patch',
json.dumps({
'id': sync.block,
'serviceId': sync.service,
'type': 'TempSensorExternal',
'data': {
'setting[degC]': msg.data['temperature[degC]'],
},
}),
err=False)


def setup(app):
features.add(app, Broadcaster(app))
mqtt_client.publish('brewcast/spark/blocks/patch',
{
'id': sync.block,
'serviceId': sync.service,
'type': 'TempSensorExternal',
'data': {
'setting[degC]': msg.data['temperature[degC]'],
},
})

async def repeat(self):
config = utils.get_config()
while True:
try:
await asyncio.sleep(self.scan_interval)
await self._run()
except Exception as ex:
LOGGER.error(ex, exc_info=config.debug)
await asyncio.sleep(5)


@asynccontextmanager
async def lifespan():
bc = Broadcaster()
task = asyncio.create_task(bc.repeat())
yield
task.cancel()
with suppress(asyncio.CancelledError):
await task
Loading

0 comments on commit a2eea47

Please sign in to comment.