From 1e94f330e94369ede0f341013cba0edd377321c4 Mon Sep 17 00:00:00 2001 From: boocmp Date: Mon, 3 Jun 2024 15:30:51 +0700 Subject: [PATCH] Sticky sessions. --- src/stt_api.py | 12 ++++++++++-- src/utils/npipe/_posix.py | 23 ++++++++++++++++++++--- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/src/stt_api.py b/src/stt_api.py index 9209bdf..8a75027 100644 --- a/src/stt_api.py +++ b/src/stt_api.py @@ -4,8 +4,8 @@ import bentoml from runners.audio_transcriber import AudioTranscriber -from fastapi import FastAPI, Request, Depends -from fastapi.responses import StreamingResponse, JSONResponse +from fastapi import FastAPI, Request, Depends, Cookie +from fastapi.responses import StreamingResponse, JSONResponse, Response from fastapi.encoders import jsonable_encoder import utils.google_streaming.google_streaming_api_pb2 as speech @@ -35,10 +35,16 @@ def to_bytes(self): app = FastAPI() +@app.get("/sticky") +async def handleSticky(request:Request, response: Response, sticky = Cookie(None),): + pass + + @app.post("/up") async def handleUpstream( pair: str, request: Request, + sticky = Cookie(None), is_valid_brave_key = Depends(check_stt_request) ): if not is_valid_brave_key: @@ -62,6 +68,8 @@ async def handleUpstream( @app.get("/down") async def handleDownstream( pair: str, + request: Request, + sticky = Cookie(None), output: str = "pb", is_valid_brave_key = Depends(check_stt_request) ): diff --git a/src/utils/npipe/_posix.py b/src/utils/npipe/_posix.py index 1b7f06c..6bfbb71 100644 --- a/src/utils/npipe/_posix.py +++ b/src/utils/npipe/_posix.py @@ -11,23 +11,31 @@ def __init__(self, fd, dir: Path): self._fd = fd self._dir = dir + def __del__(self): + try: + asyncio.get_event_loop().create_task(self.close()) + except: + pass + async def open(pair: str, timeout: int = 10000): loop = asyncio.get_running_loop() try: - dir = await loop.run_in_executor(None, tempfile.mkdtemp, None, pair + "-", Path.home() / "tmp" / "channels") + dir = tempfile.mkdtemp(None, pair + "-", Path.home() / "tmp" / "channels") dir = Path(dir) - await loop.run_in_executor(None, os.mkfifo, dir / "pipe") + os.mkfifo(dir / "pipe") async with asyncio.timeout(timeout / 1000.0): fd = await aiofiles.open(dir / "pipe", "w") return AsyncChannelWriter(fd, dir) - except asyncio.TimeoutError: + except: await aiofiles.os.unlink(dir / "pipe") await aiofiles.os.rmdir(dir) raise Exception("No consumer") async def close(self): + if not self._fd: + return await self._fd.close() await aiofiles.os.unlink(self._dir / "pipe") await aiofiles.os.rmdir(self._dir) @@ -54,6 +62,12 @@ class AsyncChannelReader: def __init__(self, fd): self._fd = fd + def __del__(self): + try: + asyncio.get_event_loop().create_task(self.close()) + except: + pass + async def open(pair: str, timeout: int = 10000): fd = None try: @@ -76,7 +90,10 @@ async def open(pair: str, timeout: int = 10000): return AsyncChannelReader(fd) async def close(self): + if not self._fd: + return await self._fd.close() + self._fd = None async def __aenter__(self): return self