Skip to content

Commit

Permalink
Sticky sessions.
Browse files Browse the repository at this point in the history
  • Loading branch information
boocmp committed Jun 3, 2024
1 parent 7a8c6e8 commit 1e94f33
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
12 changes: 10 additions & 2 deletions src/stt_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import bentoml
from runners.audio_transcriber import AudioTranscriber

from fastapi import FastAPI, Request, Depends
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi import FastAPI, Request, Depends, Cookie
from fastapi.responses import StreamingResponse, JSONResponse, Response
from fastapi.encoders import jsonable_encoder

import utils.google_streaming.google_streaming_api_pb2 as speech
Expand Down Expand Up @@ -35,10 +35,16 @@ def to_bytes(self):

app = FastAPI()

@app.get("/sticky")
async def handleSticky(request:Request, response: Response, sticky = Cookie(None),):
pass


@app.post("/up")
async def handleUpstream(
pair: str,
request: Request,
sticky = Cookie(None),
is_valid_brave_key = Depends(check_stt_request)
):
if not is_valid_brave_key:
Expand All @@ -62,6 +68,8 @@ async def handleUpstream(
@app.get("/down")
async def handleDownstream(
pair: str,
request: Request,
sticky = Cookie(None),
output: str = "pb",
is_valid_brave_key = Depends(check_stt_request)
):
Expand Down
23 changes: 20 additions & 3 deletions src/utils/npipe/_posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,31 @@ def __init__(self, fd, dir: Path):
self._fd = fd
self._dir = dir

def __del__(self):
try:
asyncio.get_event_loop().create_task(self.close())
except:
pass

async def open(pair: str, timeout: int = 10000):
loop = asyncio.get_running_loop()

try:
dir = await loop.run_in_executor(None, tempfile.mkdtemp, None, pair + "-", Path.home() / "tmp" / "channels")
dir = tempfile.mkdtemp(None, pair + "-", Path.home() / "tmp" / "channels")
dir = Path(dir)
await loop.run_in_executor(None, os.mkfifo, dir / "pipe")
os.mkfifo(dir / "pipe")
async with asyncio.timeout(timeout / 1000.0):
fd = await aiofiles.open(dir / "pipe", "w")
return AsyncChannelWriter(fd, dir)

except asyncio.TimeoutError:
except:
await aiofiles.os.unlink(dir / "pipe")
await aiofiles.os.rmdir(dir)
raise Exception("No consumer")

async def close(self):
if not self._fd:
return
await self._fd.close()
await aiofiles.os.unlink(self._dir / "pipe")
await aiofiles.os.rmdir(self._dir)
Expand All @@ -54,6 +62,12 @@ class AsyncChannelReader:
def __init__(self, fd):
self._fd = fd

def __del__(self):
try:
asyncio.get_event_loop().create_task(self.close())
except:
pass

async def open(pair: str, timeout: int = 10000):
fd = None
try:
Expand All @@ -76,7 +90,10 @@ async def open(pair: str, timeout: int = 10000):
return AsyncChannelReader(fd)

async def close(self):
if not self._fd:
return
await self._fd.close()
self._fd = None

async def __aenter__(self):
return self
Expand Down

0 comments on commit 1e94f33

Please sign in to comment.