Skip to content

Commit

Permalink
Fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
boocmp committed Apr 24, 2024
1 parent 20a3112 commit d72768e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 31 deletions.
8 changes: 0 additions & 8 deletions src/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,7 @@

svc.mount_asgi_app(app)

"""
@svc.api(input=File(), output=JSON())
async def process_audio(input_file: io.BytesIO):
transcript = await runner_audio_transcriber.transcribe_audio.async_run(input_file)
return transcript
@svc.api(input=File(), output=JSON())
async def process_audio_data(input_file: io.BytesIO):
transcript = await runner_audio_transcriber.transcribe_audio_data.async_run(input_file.read())
return transcript
"""
36 changes: 21 additions & 15 deletions src/stt_api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.encoders import jsonable_encoder
import utils.google_streaming.google_streaming_api_pb2 as speech
import bentoml
from utils.npipe import AsyncNamedPipe
import io
from runners.audio_transcriber import AudioTranscriber
import aiofiles
import json

runner_audio_transcriber = bentoml.Runner(
AudioTranscriber,
Expand Down Expand Up @@ -36,33 +39,36 @@ async def handleUpstream(pair: str, request: Request):
mic_data = bytes()

async with await AsyncNamedPipe.create(pair) as pipe:
async for chunk in request.stream():
mic_data += chunk
text = await runner_audio_transcriber.async_run(io.BytesIO(mic_data))
await pipe.write(text["text"] + '\n')
#async with aiofiles.open("/tmp/mic", "wb") as mic:
async for chunk in request.stream():
#await mic.write(chunk)
mic_data += chunk
text = await runner_audio_transcriber.async_run(io.BytesIO(mic_data))
await pipe.write(text["text"] + '\n')

except Exception as e:
print("up :", e)
return "exception" + str(e)
return JSONResponse(content = jsonable_encoder({ "status" : "exception", "exception" : str(e) }) )

return ""
return JSONResponse(content = jsonable_encoder({ "status" : "ok" }))

@app.get("/down")
async def handleDownstream(pair:str):
async def handleDownstream(pair: str, output: str = "pb"):
async def handleStream(pair):
try:
async with await AsyncNamedPipe.open(pair) as pipe:
while True:
text = await pipe.readline()
if not text:
break
event = RecongitionEvent()
event.add_text(text.strip('\n'))

yield event.to_bytes()
text = text.strip('\n')
if output == "pb":
event = RecongitionEvent()
event.add_text(text)

yield event.to_bytes()
else:
yield json.dumps({ "text" : text })
except Exception as e:
print("down :", e)
yield str(e)
yield json.dumps({ "exception" : str(e)})

return StreamingResponse(handleStream(pair))
26 changes: 18 additions & 8 deletions src/utils/npipe/_posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,43 @@
import asyncio
import aiofiles
import aiofiles.os
from contextlib import contextmanager

class AsyncNamedPipe:
def __init__(self, pipe):
def __init__(self, pipe, path, creator = False):
self._pipe = pipe
self._path = path
self._creator = creator

async def create(path: str, timeout: int = 10000):
loop = asyncio.get_running_loop()
path = f"/tmp/{path}"
try:
await loop.run_in_executor(None, os.mkfifo,f"/tmp/{path}")
await loop.run_in_executor(None, os.mkfifo, path)
except:
pass

try:
async with asyncio.timeout(timeout / 1000.0):
fd = await aiofiles.open(f"/tmp/{path}", "w")
return AsyncNamedPipe(fd)
fd = await aiofiles.open(path, "w")
return AsyncNamedPipe(fd, path, True)

except asyncio.TimeoutError:
await aiofiles.os.unlink(f"/tmp/{path}")
await aiofiles.os.unlink(path)
raise Exception("No consumer")

async def open(path: str, timeout: int = 1000):
pipe = None

path = f"/tmp/{path}"
try:
async with asyncio.timeout(timeout / 1000.0):
pipe = await aiofiles.open(f"/tmp/{path}", "r")
while not await aiofiles.ospath.exists(path):
await asyncio.sleep(0.1)
pipe = await aiofiles.open(path, "r")
except asyncio.TimeoutError:
raise Exception("No producer")

return AsyncNamedPipe(pipe)
return AsyncNamedPipe(pipe, path)

async def __aenter__(self):
return self
Expand All @@ -58,4 +65,7 @@ async def readline(self):


async def close(self):
if self._creator:
await aiofiles.os.unlink(self._path)

return await self._pipe.close()

0 comments on commit d72768e

Please sign in to comment.