From 1aef3bdbdcb6a4570379c2ad10bbd0e7c24cc542 Mon Sep 17 00:00:00 2001 From: Josh Smith Date: Sat, 22 Jun 2024 19:50:28 -0400 Subject: [PATCH] Dynamic mirror round robin auto-weighting for downloads (#3) * Mirror prioritization via request tracking in DB * better tracking * Cache mirror score in app mem for 5mins * comment todo * Rewrite prioritization * Remove memes * Better handling of 404 case * rm print * Cleanup osz2 served log * Log mirror weight * bit better log * Add TODO for mirror_cache_age * Remove unused TimedOut * remove unnecessary pattern * Reduce unncessary code a bit * Deploy to prod * Update healthcheck settings to boot up faster * Debug logs * Remove 408 case * Stop takeover for merge --- .env.example | 6 + app/adapters/beatmap_mirrors/__init__.py | 1 + app/adapters/beatmap_mirrors/mino.py | 3 + .../beatmap_mirrors/mirror_aggregate.py | 163 +++++++++--------- app/adapters/beatmap_mirrors/osu_direct.py | 3 + app/adapters/mysql.py | 14 ++ app/api/v1/osz2_files.py | 2 - app/init_api.py | 28 +++ app/repositories/__init__.py | 0 app/repositories/beatmap_mirror_requests.py | 88 ++++++++++ app/scheduling.py | 46 +++++ app/settings.py | 6 + app/state.py | 3 + chart/values.yaml | 6 +- requirements.txt | 2 + 15 files changed, 284 insertions(+), 87 deletions(-) create mode 100644 app/adapters/mysql.py create mode 100644 app/repositories/__init__.py create mode 100644 app/repositories/beatmap_mirror_requests.py create mode 100644 app/scheduling.py create mode 100644 app/state.py diff --git a/.env.example b/.env.example index fd0215a..dc0f053 100644 --- a/.env.example +++ b/.env.example @@ -8,3 +8,9 @@ CODE_HOTRELOAD= OSU_API_V2_CLIENT_ID= OSU_API_V2_CLIENT_SECRET= + +DB_USER=cmyui +DB_PASS=lol123 +DB_HOST=localhost +DB_PORT=3306 +DB_NAME=akatsuki diff --git a/app/adapters/beatmap_mirrors/__init__.py b/app/adapters/beatmap_mirrors/__init__.py index 9f7014e..0de410f 100644 --- a/app/adapters/beatmap_mirrors/__init__.py +++ b/app/adapters/beatmap_mirrors/__init__.py @@ -12,6 +12,7 @@ class BeatmapMirror(ABC): def __init__(self, *args: Any, **kwargs: Any) -> None: self.http_client = httpx.AsyncClient() + self.weight = 0 super().__init__(*args, **kwargs) @abstractmethod diff --git a/app/adapters/beatmap_mirrors/mino.py b/app/adapters/beatmap_mirrors/mino.py index 27c6dbc..1a9da35 100644 --- a/app/adapters/beatmap_mirrors/mino.py +++ b/app/adapters/beatmap_mirrors/mino.py @@ -1,5 +1,7 @@ import logging +import httpx + from app.adapters.beatmap_mirrors import BeatmapMirror @@ -12,6 +14,7 @@ async def fetch_beatmap_zip_data(self, beatmapset_id: int) -> bytes | None: logging.info(f"Fetching beatmapset osz2 from mino: {beatmapset_id}") response = await self.http_client.get( f"{self.base_url}/d/{beatmapset_id}", + timeout=httpx.Timeout(None, connect=2), ) response.raise_for_status() return response.read() diff --git a/app/adapters/beatmap_mirrors/mirror_aggregate.py b/app/adapters/beatmap_mirrors/mirror_aggregate.py index 440662a..2e1e604 100644 --- a/app/adapters/beatmap_mirrors/mirror_aggregate.py +++ b/app/adapters/beatmap_mirrors/mirror_aggregate.py @@ -1,104 +1,103 @@ -import asyncio import logging import random -import time +from datetime import datetime -from app.adapters.beatmap_mirrors import BeatmapMirror -from app.adapters.beatmap_mirrors.gatari import GatariMirror -from app.adapters.beatmap_mirrors.mino import MinoMirror from app.adapters.beatmap_mirrors.nerinyan import NerinyanMirror from app.adapters.beatmap_mirrors.osu_direct import OsuDirectMirror -from app.adapters.beatmap_mirrors.ripple import RippleMirror +from app.repositories import beatmap_mirror_requests +from app.scheduling import DynamicWeightedRoundRobin -BEATMAP_MIRRORS: list[BeatmapMirror] = [ - # GatariMirror(), - # MinoMirror(), - NerinyanMirror(), - OsuDirectMirror(), - # Disabled as ripple only supports ranked maps - # RippleMirror(), -] +# from app.adapters.beatmap_mirrors.gatari import GatariMirror +# from app.adapters.beatmap_mirrors.mino import MinoMirror +# from app.adapters.beatmap_mirrors.ripple import RippleMirror +ZIP_FILE_HEADER = b"PK\x03\x04" -async def run_with_semaphore( - semaphore: asyncio.Semaphore, - mirror: BeatmapMirror, - beatmapset_id: int, -) -> tuple[BeatmapMirror, bytes | None]: - async with semaphore: - return (mirror, await mirror.fetch_beatmap_zip_data(beatmapset_id)) +BEATMAP_SELECTOR = DynamicWeightedRoundRobin( + mirrors=[ + # GatariMirror(), + # MinoMirror(), + NerinyanMirror(), + OsuDirectMirror(), + # Disabled as ripple only supports ranked maps + # RippleMirror(), + ], +) -class TimedOut: ... - - -TIMED_OUT = TimedOut() - - -async def fetch_beatmap_zip_data(beatmapset_id: int) -> bytes | TimedOut | None: +async def fetch_beatmap_zip_data(beatmapset_id: int) -> bytes | None: """\ - Parallelize calls with a timeout across up to 5 mirrors at time, - to ensure our clients get a response in a reasonable time. + Fetch a beatmapset .osz2 file by any means necessary, balancing upon + multiple underlying beatmap mirrors to ensure the best possible + availability and performance. """ + started_at = datetime.now() + + await BEATMAP_SELECTOR.update_all_mirror_and_selector_weights() + + while True: + mirror = BEATMAP_SELECTOR.select_mirror() + beatmap_zip_data: bytes | None = None + try: + beatmap_zip_data = await mirror.fetch_beatmap_zip_data(beatmapset_id) + + if beatmap_zip_data is not None and ( + not beatmap_zip_data.startswith(ZIP_FILE_HEADER) + or len(beatmap_zip_data) < 20_000 + ): + raise ValueError("Received bad osz2 data from mirror") + except Exception as exc: + ended_at = datetime.now() + await beatmap_mirror_requests.create( + request_url=f"{mirror.base_url}/d/{beatmapset_id}", + api_key_id=None, + mirror_name=mirror.name, + success=False, + started_at=started_at, + ended_at=ended_at, + response_size=len(beatmap_zip_data) if beatmap_zip_data else 0, + response_error=str(exc), + ) + await BEATMAP_SELECTOR.update_all_mirror_and_selector_weights() + logging.warning( + "Failed to fetch beatmapset osz2 from mirror", + exc_info=True, + extra={ + "mirror_name": mirror.name, + "mirror_weight": mirror.weight, + "beatmapset_id": beatmapset_id, + }, + ) + continue + else: + break + + ended_at = datetime.now() + + await beatmap_mirror_requests.create( + request_url=f"{mirror.base_url}/d/{beatmapset_id}", + api_key_id=None, + mirror_name=mirror.name, + success=True, + started_at=started_at, + ended_at=ended_at, + response_size=len(beatmap_zip_data) if beatmap_zip_data else 0, + response_error=None, + ) + await BEATMAP_SELECTOR.update_all_mirror_and_selector_weights() - # TODO: it would be nice to be able to stream the responses, - # but that would require a different approach where the - # discovery process would be complete once the mirror has - # started streaming, rather than after the response has - # been read in full. - - concurrency_limit = 5 - global_timeout = 15 - semaphore = asyncio.Semaphore(concurrency_limit) - - start_time = time.time() - - # TODO: prioritization based on reliability, speed, etc. - random.shuffle(BEATMAP_MIRRORS) - - coroutines = [ - asyncio.create_task( - run_with_semaphore( - semaphore, - mirror, - beatmapset_id, - ), - ) - for mirror in BEATMAP_MIRRORS - ] - try: - done, pending = await asyncio.wait( - coroutines, - return_when=asyncio.FIRST_COMPLETED, - timeout=global_timeout, - ) - for task in pending: - task.cancel() - first_result = await list(done)[0] - except TimeoutError: - return None - - # TODO: log which mirrors finished, and which timed out - - mirror, result = first_result - if result is None: - return None - - end_time = time.time() - ms_elapsed = (end_time - start_time) * 1000 + ms_elapsed = (ended_at.timestamp() - started_at.timestamp()) * 1000 logging.info( - "A mirror was first to finish during .osz2 aggregate request", + "Served beatmapset osz2 from mirror", extra={ "mirror_name": mirror.name, + "mirror_weight": mirror.weight, "beatmapset_id": beatmapset_id, "ms_elapsed": ms_elapsed, - "data_size": len(result), - "bad_data": ( - result - if not result.startswith(b"PK\x03\x04") or len(result) < 20_000 - else None + "data_size": ( + len(beatmap_zip_data) if beatmap_zip_data is not None else None ), }, ) - return result + return beatmap_zip_data diff --git a/app/adapters/beatmap_mirrors/osu_direct.py b/app/adapters/beatmap_mirrors/osu_direct.py index 209069f..986fe3f 100644 --- a/app/adapters/beatmap_mirrors/osu_direct.py +++ b/app/adapters/beatmap_mirrors/osu_direct.py @@ -1,5 +1,7 @@ import logging +import httpx + from app.adapters.beatmap_mirrors import BeatmapMirror @@ -12,6 +14,7 @@ async def fetch_beatmap_zip_data(self, beatmapset_id: int) -> bytes | None: logging.info(f"Fetching beatmapset osz2 from osu!direct: {beatmapset_id}") response = await self.http_client.get( f"{self.base_url}/d/{beatmapset_id}", + timeout=httpx.Timeout(None, connect=2), ) response.raise_for_status() return response.read() diff --git a/app/adapters/mysql.py b/app/adapters/mysql.py new file mode 100644 index 0000000..610c0e8 --- /dev/null +++ b/app/adapters/mysql.py @@ -0,0 +1,14 @@ +import urllib.parse + + +def create_dsn( + driver: str | None, + username: str, + password: str, + host: str, + port: int | None, + database: str, +) -> str: + driver_str = f"+{driver}" if driver else "" + passwd_str = urllib.parse.quote_plus(password) if password else "" + return f"mysql{driver_str}://{username}:{passwd_str}@{host}:{port}/{database}" diff --git a/app/api/v1/osz2_files.py b/app/api/v1/osz2_files.py index dbd1e37..8996569 100644 --- a/app/api/v1/osz2_files.py +++ b/app/api/v1/osz2_files.py @@ -17,7 +17,5 @@ async def download_beatmapset_osz2(beatmapset_id: int) -> Response: "Content-Disposition": f"attachment; filename={beatmapset_id}.osz", }, ) - if isinstance(beatmap_zip_data, mirror_aggregate.TimedOut): - return Response(status_code=408) return Response(status_code=404) diff --git a/app/init_api.py b/app/init_api.py index 3cc61b2..1dc157c 100644 --- a/app/init_api.py +++ b/app/init_api.py @@ -1,14 +1,26 @@ import logging +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from databases import Database from fastapi import FastAPI from fastapi import Request from fastapi import Response from starlette.middleware.base import RequestResponseEndpoint from app import settings +from app import state +from app.adapters import mysql from app.api import api_router +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncIterator[None]: + await state.database.connect() + yield + await state.database.disconnect() + + def init_routes(app: FastAPI) -> FastAPI: app.include_router(api_router) return app @@ -29,15 +41,31 @@ async def http_middleware( return app +def init_db(app: FastAPI) -> FastAPI: + state.database = Database( + url=mysql.create_dsn( + driver="aiomysql", + username=settings.DB_USER, + password=settings.DB_PASS, + host=settings.DB_HOST, + port=settings.DB_PORT, + database=settings.DB_NAME, + ), + ) + return app + + def init_api() -> FastAPI: app = FastAPI( openapi_url="/openapi.json" if settings.APP_ENV != "production" else None, docs_url="/docs" if settings.APP_ENV != "production" else None, redoc_url="/redoc" if settings.APP_ENV != "production" else None, swagger_ui_oauth2_redirect_url=None, + lifespan=lifespan, ) app = init_routes(app) app = init_middleware(app) + app = init_db(app) return app diff --git a/app/repositories/__init__.py b/app/repositories/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/repositories/beatmap_mirror_requests.py b/app/repositories/beatmap_mirror_requests.py new file mode 100644 index 0000000..5f45798 --- /dev/null +++ b/app/repositories/beatmap_mirror_requests.py @@ -0,0 +1,88 @@ +import math +from datetime import datetime + +from pydantic import BaseModel + +from app import state + + +class BeatmapMirrorScore(BaseModel): + mirror_name: str + score: float + + +async def get_mirror_weight(mirror_name: str) -> int: + """Give the mirror a weighting based on its latency and failure rate.""" + p90_success_ms_latency = await state.database.fetch_val( + """\ + WITH request_latencies AS ( + SELECT (ended_at - started_at) * 1000 AS ms_elapsed, + PERCENT_RANK() OVER (ORDER BY ended_at - started_at) p + FROM beatmap_mirror_requests + WHERE started_at > NOW() - INTERVAL 1 HOUR + AND mirror_name = :mirror_name + AND success = 1 + ) + SELECT DISTINCT first_value(ms_elapsed) OVER ( + ORDER BY CASE WHEN p <= 0.9 THEN p END DESC + ) p90_success_ms_latency + FROM request_latencies + """, + {"mirror_name": mirror_name}, + ) + if p90_success_ms_latency is None: + return 1 + + failure_rate = await state.database.fetch_val( + """\ + SELECT AVG(success = 0) + FROM beatmap_mirror_requests + WHERE started_at > NOW() - INTERVAL 1 HOUR + AND mirror_name = :mirror_name + """, + {"mirror_name": mirror_name}, + ) + if failure_rate is None: + return 1 + + # https://www.desmos.com/calculator/0am8xnwxyo + latency_weight = 1000 / math.log(p90_success_ms_latency) + failure_weight = math.exp(-10 * failure_rate) + # TODO: integrate `mirror_cache_age` into the weight calculation + weight = max(1, int(latency_weight * failure_weight)) + return weight + + +async def create( + request_url: str, + api_key_id: str | None, + mirror_name: str, + success: bool, + started_at: datetime, + ended_at: datetime, + response_size: int, + response_error: str | None, +) -> None: + query = """\ + INSERT INTO beatmap_mirror_requests ( + request_url, api_key_id, mirror_name, success, started_at, + ended_at, response_size, response_error + ) + VALUES ( + :request_url, :api_key_id, :mirror_name, :success, :started_at, + :ended_at, :response_size, :response_error + ) + """ + await state.database.execute( + query=query, + values={ + "request_url": request_url, + "api_key_id": api_key_id, + "mirror_name": mirror_name, + "success": success, + "started_at": started_at, + "ended_at": ended_at, + "response_size": response_size, + "response_error": response_error, + }, + ) diff --git a/app/scheduling.py b/app/scheduling.py new file mode 100644 index 0000000..3929117 --- /dev/null +++ b/app/scheduling.py @@ -0,0 +1,46 @@ +import math + +from app.adapters.beatmap_mirrors import BeatmapMirror +from app.repositories import beatmap_mirror_requests + + +class DynamicWeightedRoundRobin: + def __init__(self, mirrors: list[BeatmapMirror]) -> None: + self.mirrors = mirrors + self.index = -1 + self.current_weight = 0 + self.max_weight = max(mirror.weight for mirror in mirrors) + self.gcd_weight = self._calculate_gcd( + [mirror.weight for mirror in mirrors], + ) + + @staticmethod + def _calculate_gcd(weights: list[int]) -> int: + gcd = weights[0] + for weight in weights[1:]: + gcd = math.gcd(gcd, weight) + return gcd + + async def update_all_mirror_and_selector_weights(self) -> None: + for beatmap_mirror in self.mirrors: + beatmap_mirror.weight = await beatmap_mirror_requests.get_mirror_weight( + beatmap_mirror.name, + ) + + self.max_weight = max(mirror.weight for mirror in self.mirrors) + self.gcd_weight = self._calculate_gcd( + [mirror.weight for mirror in self.mirrors], + ) + + def select_mirror(self) -> BeatmapMirror: + while True: + self.index = (self.index + 1) % len(self.mirrors) + if self.index == 0: + self.current_weight -= self.gcd_weight + if self.current_weight <= 0: + self.current_weight = self.max_weight + if self.current_weight == 0: + raise RuntimeError("All mirrors have 0 weight.") + + if self.mirrors[self.index].weight >= self.current_weight: + return self.mirrors[self.index] diff --git a/app/settings.py b/app/settings.py index c60b96b..e4d41fb 100644 --- a/app/settings.py +++ b/app/settings.py @@ -17,3 +17,9 @@ def read_bool(s: str) -> bool: OSU_API_V2_CLIENT_ID = os.environ["OSU_API_V2_CLIENT_ID"] OSU_API_V2_CLIENT_SECRET = os.environ["OSU_API_V2_CLIENT_SECRET"] + +DB_USER = os.environ["DB_USER"] +DB_PASS = os.environ["DB_PASS"] +DB_HOST = os.environ["DB_HOST"] +DB_PORT = int(os.environ["DB_PORT"]) +DB_NAME = os.environ["DB_NAME"] diff --git a/app/state.py b/app/state.py new file mode 100644 index 0000000..7839803 --- /dev/null +++ b/app/state.py @@ -0,0 +1,3 @@ +from databases import Database + +database: Database diff --git a/chart/values.yaml b/chart/values.yaml index baea3a6..19bb544 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -17,10 +17,10 @@ apps: path: /_health port: 80 initialDelaySeconds: 10 - periodSeconds: 10 - timeoutSeconds: 3 + periodSeconds: 3 + timeoutSeconds: 4 successThreshold: 1 - failureThreshold: 3 + failureThreshold: 5 resources: limits: cpu: 400m diff --git a/requirements.txt b/requirements.txt index a0f29cf..6c8246d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ +cryptography +databases[aiomysql] fastapi httpx python-dotenv