Skip to content

Commit

Permalink
Add websocket server for updating camera channels
Browse files Browse the repository at this point in the history
  • Loading branch information
ugyballoons committed Jul 31, 2023
1 parent 61098ba commit 546da22
Show file tree
Hide file tree
Showing 16 changed files with 266 additions and 131 deletions.
12 changes: 6 additions & 6 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ beautifulsoup4==4.12.2 \
--hash=sha256:492bbc69dca35d12daac71c4db1bfff0c876c00ef4a2ffacce226d4638eb72da \
--hash=sha256:bd2520ca0d9d7d12694a53d44ac482d181b4ec1888909b035a3dbf40d0f57d4a
# via -r requirements/dev.in
boto3==1.28.9 \
--hash=sha256:01f078047eb4d238c6b9c6cc623f2af33b4ae67980c5326691e35cb5493ff6c7 \
--hash=sha256:4cc0c6005be910e52077227e670930ab55a41ba86cdb6d1c052571d08cd4d32c
boto3==1.28.10 \
--hash=sha256:67001b3f512cbe2e00e352c65fb443b504e5e388fee39d73bcc42da1ae87d9e3 \
--hash=sha256:cb8af03f553f1c7db7137bc897785baeeaa97b8fde483eb1cdb1f1ef3cec9cb7
# via
# -c requirements/main.txt
# moto
botocore==1.31.9 \
--hash=sha256:bd849d3ac95f1781385ed831d753a04a3ec870a59d6598175aaedd71dc2baf5f \
--hash=sha256:e56ccd3536a90094ea5b176b5dd33bfe4f049efdf71af468ea1661bd424c787d
botocore==1.31.10 \
--hash=sha256:736a9412f405d6985570c4a87b533c2396dd8d4042d8c7a0ca14e73d4f1bcf9d \
--hash=sha256:a3bfd3627a490faedf37d79373d6957936d7720888ca85466e0471cb921e4557
# via
# -c requirements/main.txt
# boto3
Expand Down
1 change: 1 addition & 0 deletions requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ pydantic_settings
safir>=3.4.0
boto3
jinja2
websockets
16 changes: 9 additions & 7 deletions requirements/main.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ anyio==3.7.1 \
# httpcore
# starlette
# watchfiles
boto3==1.28.9 \
--hash=sha256:01f078047eb4d238c6b9c6cc623f2af33b4ae67980c5326691e35cb5493ff6c7 \
--hash=sha256:4cc0c6005be910e52077227e670930ab55a41ba86cdb6d1c052571d08cd4d32c
boto3==1.28.10 \
--hash=sha256:67001b3f512cbe2e00e352c65fb443b504e5e388fee39d73bcc42da1ae87d9e3 \
--hash=sha256:cb8af03f553f1c7db7137bc897785baeeaa97b8fde483eb1cdb1f1ef3cec9cb7
# via -r requirements/main.in
botocore==1.31.9 \
--hash=sha256:bd849d3ac95f1781385ed831d753a04a3ec870a59d6598175aaedd71dc2baf5f \
--hash=sha256:e56ccd3536a90094ea5b176b5dd33bfe4f049efdf71af468ea1661bd424c787d
botocore==1.31.10 \
--hash=sha256:736a9412f405d6985570c4a87b533c2396dd8d4042d8c7a0ca14e73d4f1bcf9d \
--hash=sha256:a3bfd3627a490faedf37d79373d6957936d7720888ca85466e0471cb921e4557
# via
# boto3
# s3transfer
Expand Down Expand Up @@ -607,4 +607,6 @@ websockets==11.0.3 \
--hash=sha256:f61bdb1df43dc9c131791fbc2355535f9024b9a04398d3bd0684fc16ab07df74 \
--hash=sha256:fb06eea71a00a7af0ae6aefbb932fb8a7df3cb390cc217d51a9ad7343de1b8d0 \
--hash=sha256:ffd7dcaf744f25f82190856bc26ed81721508fc5cbf2a330751e135ff1283564
# via uvicorn
# via
# -r requirements/main.in
# uvicorn
14 changes: 8 additions & 6 deletions src/rubintv/background/bucketpoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import boto3

from rubintv.handlers.websocket import notify_camera_clients
from rubintv.models.models import Event, Location, get_current_day_obs


Expand All @@ -27,19 +28,18 @@ async def poll_buckets_for_todays_data(self) -> None:
prefix = f"{camera.name}/{current_day_obs}"
objects = self.list_objects(loc.bucket_name, prefix)
if objects != self._current_lists[loc.name][camera.name]:
self._current_lists[loc.name][camera.name] = objects
msg = {
f"{loc.name}/{camera.name}": objects_to_events(
objects
)
}
print(msg)
# await notify_current_camera_table_clients(msg)
self._current_lists[loc.name][camera.name] = objects
await asyncio.sleep(0.5)
await notify_camera_clients(msg)
await asyncio.sleep(1)

async def get_current_state(
self, location_name: str, camera_name: str
) -> list[Event] | None:
) -> list[dict[str, str]] | None:
return self._current_lists[location_name][camera_name]

def list_objects(self, bucket_name: str, prefix: str) -> list[dict]:
Expand All @@ -66,6 +66,8 @@ def get_object(self, bucket_name: str, object_id: str) -> dict:
return self._client.get_object(Bucket=bucket_name, Key=object_id)


def objects_to_events(objects: list[dict]) -> list[Event]:
def objects_to_events(objects: list[dict] | None) -> list[Event] | None:
if objects is None:
return None
events = [Event(**object) for object in objects]
return events
55 changes: 30 additions & 25 deletions src/rubintv/handlers/api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"""Handlers for the app's external root, ``/rubintv/``."""
from datetime import date
from itertools import chain
from typing import Tuple

from fastapi import APIRouter, HTTPException, Request

from rubintv.background.bucketpoller import BucketPoller, objects_to_events
from rubintv.models.helpers import find_first
from rubintv.models.models import Camera, Event, Location, get_current_day_obs

Expand All @@ -14,48 +14,53 @@
"""FastAPI router for all external handlers."""


@api_router.get("/location/{location_name}", response_model=Location)
async def get_location(
location_name: str,
request: Request,
) -> Location:
locations = request.app.state.fixtures.locations
@api_router.get("/", response_model=list[Location])
async def get_api_root(request: Request) -> list[Location]:
locations = request.app.state.models.locations
return locations


@api_router.get("/{location_name}", response_model=Location)
async def get_location(location_name: str, request: Request) -> Location:
locations = request.app.state.models.locations
if not (location := find_first(locations, "name", location_name)):
raise HTTPException(status_code=404, detail="Location not found.")
return location


@api_router.get(
"/location/{location_name}/camera/{camera_name}",
response_model=Tuple[Location, Camera],
"/{location_name}/{camera_name}",
response_model=Camera,
)
async def get_location_camera(
location_name: str,
camera_name: str,
request: Request,
) -> Tuple[Location, Camera]:
location_name: str, camera_name: str, request: Request
) -> Camera:
location = await get_location(location_name, request)
cameras = request.app.state.fixtures.cameras
cameras = request.app.state.models.cameras
camera_groups = location.camera_groups.values()
location_cams = chain(*camera_groups)
if camera_name not in location_cams or not (
camera := find_first(cameras, "name", camera_name)
):
raise HTTPException(status_code=404, detail="Camera not found.")
return (location, camera)
return camera


@api_router.get(
"/location/{location_name}/camera/{camera_name}/current",
response_model=Tuple[date, list[Event] | None],
"/{location_name}/{camera_name}/current",
response_model=dict[str, date | list[Event] | None],
)
async def get_camera_current_events(
location_name: str,
camera_name: str,
request: Request,
) -> Tuple[date, list[Event] | None]:
await get_location_camera(location_name, camera_name, request)
location_name: str, camera_name: str, request: Request
) -> dict[str, date | list[Event] | None]:
await get_location(location_name, request)
camera = await get_location_camera(location_name, camera_name, request)
current_day_obs = get_current_day_obs()
bucket_poller = request.app.state.bucket_poller
events = await bucket_poller.get_current_state(location_name, camera_name)
return (current_day_obs, events)
events = None
if camera.online:
bucket_poller: BucketPoller = request.app.state.bucket_poller
objects = await bucket_poller.get_current_state(
location_name, camera_name
)
events = objects_to_events(objects)
return {"date": current_day_obs, "events": events}
7 changes: 3 additions & 4 deletions src/rubintv/handlers/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def get_home(
) -> Response:
"""GET ``/rubintv/`` (the app's external root)."""
logger.info("Request for the app home page")
locations = request.app.state.fixtures.locations
locations = request.app.state.models.locations
return templates.TemplateResponse(
"home.jinja", {"request": request, "locations": locations}
)
Expand Down Expand Up @@ -53,9 +53,8 @@ async def get_camera_page(
camera_name: str,
request: Request,
) -> Response:
location, camera = await get_location_camera(
location_name, camera_name, request
)
location = await get_location(location_name, request)
camera = await get_location_camera(location_name, camera_name, request)
template = "camera"
if not camera.online:
template = "not_online"
Expand Down
85 changes: 85 additions & 0 deletions src/rubintv/handlers/websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import re
from typing import Tuple

from fastapi import APIRouter, WebSocket, WebSocketDisconnect

from rubintv.models.helpers import find_first
from rubintv.models.models import Camera, Event, Location

ws_router = APIRouter()
connected_clients: dict[WebSocket, Tuple[str, str]] = {}


@ws_router.websocket("/")
async def websocket_endpoint(websocket: WebSocket) -> None:
"""Websocket endpoint for proving updates for camera data, channels or
night reports.
Clients wanting to connect to an updater should send either:
`"camera <location_name>/<camera_name>"`
`"nightreport <location_name>/<camera_name>"`
`"channel <location_name>/<camera_name>/<channel_name>"`
A confirmation `"OK <location_name>/<camera_name>[/<channel_name>]"`
will be returned, followed by updates for the chosen service.
Parameters
----------
websocket : WebSocket
The websocket object representing the server/client connection.
"""
await websocket.accept()
try:
while True:
data: str = await websocket.receive_text()
text = data.strip()
if not is_valid_client_request(text):
continue
updater, loc_cam_id = text.split(" ")
location, camera, *channel = loc_cam_id.split("/")
locations = websocket.app.state.models.locations
if not is_valid_location_camera(location, camera, locations):
continue
match updater:
case "camera":
await websocket.send_text(f"OK {loc_cam_id}")
connected_clients[websocket] = (updater, loc_cam_id)
except WebSocketDisconnect:
if websocket in connected_clients:
del connected_clients[websocket]


async def notify_camera_clients(
message_for_cam: dict[str, list[Event] | None]
) -> None:
((msg_cam_loc, events),) = message_for_cam.items()
for websocket, (updater, loc_cam_id) in connected_clients.items():
if updater == "camera" and loc_cam_id == msg_cam_loc:
if events:
serialised = [ev.__dict__ for ev in events]
else:
serialised = None
await websocket.send_json(serialised)
return


def is_valid_client_request(client_text: str) -> bool:
valid_req = re.compile(r"^(camera|channel|nightreport)\s+[\w\/]+\w+$")
if valid_req.fullmatch(client_text):
return True
return False


def is_valid_location_camera(
location_name: str, camera_name: str, locations: list[Location]
) -> bool:
location: Location | None
if not (location := find_first(locations, "name", location_name)):
return False
camera: Camera | None
if not (camera := find_first(location.cameras, "name", camera_name)):
return False
if not camera.online:
return False
return True
9 changes: 4 additions & 5 deletions src/rubintv/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from importlib.metadata import metadata, version
from pathlib import Path
from typing import AsyncGenerator
from weakref import WeakSet

from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
Expand All @@ -25,6 +24,7 @@
from .handlers.api import api_router
from .handlers.external import external_router
from .handlers.internal import internal_router
from .handlers.websocket import ws_router
from .mockdata import mock_up_data
from .models.models_init import ModelsInitiator

Expand Down Expand Up @@ -59,11 +59,8 @@ async def lifespan(app: FastAPI) -> AsyncGenerator:
# start polling buckets for data
polling = asyncio.create_task(bp.poll_buckets_for_todays_data())

connected_clients: WeakSet = WeakSet()

yield

del connected_clients
polling.cancel()
# Remove mocking when actual s3 is populated.
mock.stop()
Expand All @@ -82,8 +79,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator:
lifespan=lifespan,
)

app.state.fixtures = models
app.state.models = models
app.state.bucket_poller = bp
# app.state.connected_clients = connected_clients

# Intwine jinja2 templating
app.mount(
Expand All @@ -95,6 +93,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator:
# Attach the routers.
app.include_router(internal_router)
app.include_router(api_router, prefix=f"{config.path_prefix}/api")
app.include_router(ws_router, prefix=f"{config.path_prefix}/ws")
app.include_router(external_router, prefix=f"{config.path_prefix}")

# Add middleware.
Expand Down
5 changes: 0 additions & 5 deletions src/rubintv/mockdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ def mock_up_data(locations: list[Location], cameras: list[Camera]) -> None:
if camera := find_first(cameras, "name", camera_name):
if camera.channels:
for index, channel in enumerate(camera.channels):
print(
f"Uploading testcard to {location.name} for"
f"{camera_name} in {channel.name}"
)
# upload a file for today
upload_file(
Path(__file__).parent
Expand All @@ -68,7 +64,6 @@ def mock_up_data(locations: list[Location], cameras: list[Camera]) -> None:
f"{index:06}.jpg"
),
)
print()


def upload_file(
Expand Down
Loading

0 comments on commit 546da22

Please sign in to comment.