Skip to content

Commit

Permalink
new ipc pipes.
Browse files Browse the repository at this point in the history
  • Loading branch information
boocmp committed Jun 6, 2024
1 parent 9191cc7 commit 1b5a219
Show file tree
Hide file tree
Showing 13 changed files with 320 additions and 122 deletions.
6 changes: 4 additions & 2 deletions bentofile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ include:
- "runners/__init__.py"
- "runners/audio_transcriber.py"
- "utils/google_streaming/google_streaming_api_pb2.py"
- "utils/npipe/__init__.py"
- "utils/npipe/_posix.py"
- "utils/ipc/__init__.py"
- "utils/ipc/client.py"
- "utils/ipc/messages.py"
- "utils/ipc/server.py"
- "utils/config/config.py"
- "utils/service_key/brave_service_key.py"
- "configuration.yaml"
Expand Down
2 changes: 0 additions & 2 deletions env/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ RUN chmod +x /home/bentoml/bento/env/docker/entrypoint.sh

USER bentoml

RUN mkdir -p /home/bentoml/tmp/channels

ENTRYPOINT [ "/home/bentoml/bento/env/docker/entrypoint.sh" ]


1 change: 1 addition & 0 deletions env/python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ asyncio
pydantic
pydantic-settings
six
msgspec
2 changes: 1 addition & 1 deletion src/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ runners:
timeout: 900
resources:
cpu: 4
nvidia.com/gpu: 1
nvidia.com/gpu: 1
workers_per_resource: 10
1 change: 1 addition & 0 deletions src/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ asyncio
pydantic
pydantic-settings
six
msgspec
8 changes: 8 additions & 0 deletions src/service.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import io
import os

import bentoml
from bentoml.io import JSON, File

from stt_api import app, runner_audio_transcriber

from utils.ipc import server

svc = bentoml.Service(
"stt",
Expand All @@ -13,6 +15,12 @@

svc.mount_asgi_app(app)

@svc.on_deployment
def on_deployment():
if not os.fork():
server.start_ipc_server()


@svc.api(input=File(), output=JSON())
async def process_audio(input_file: io.BytesIO):
transcript = await runner_audio_transcriber.transcribe_audio.async_run(input_file)
Expand Down
28 changes: 14 additions & 14 deletions src/stt_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
from fastapi.encoders import jsonable_encoder

import utils.google_streaming.google_streaming_api_pb2 as speech
from utils.npipe import AsyncChannelWriter, AsyncChannelReader
from utils.service_key.brave_service_key import check_stt_request

import utils.ipc as ipc

runner_audio_transcriber = bentoml.Runner(
AudioTranscriber,
name="audio_transcriber",
)


class RecongitionEvent:
def __init__(self):
self._event = speech.SpeechRecognitionEvent()
Expand Down Expand Up @@ -52,13 +53,13 @@ async def handleUpstream(

try:
mic_data = bytes()
async with await AsyncChannelWriter.open(pair) as pipe:
async for chunk in request.stream():
if len(chunk) == 0:
break
mic_data += chunk
text = await runner_audio_transcriber.async_run(io.BytesIO(mic_data))
await pipe.write(text["text"] + '\n')
async with ipc.client.Publisher(pair) as pipe:
async for chunk in request.stream():
if len(chunk) == 0:
break
mic_data += chunk
text = await runner_audio_transcriber.async_run(io.BytesIO(mic_data))
await pipe.push(ipc.messages.Text(text["text"], False))

except Exception as e:
return JSONResponse(content = jsonable_encoder({ "status" : "exception", "exception" : str(e) }) )
Expand All @@ -76,19 +77,18 @@ async def handleDownstream(

async def handleStream(pair):
try:
async with await AsyncChannelReader.open(pair) as pipe:
async with ipc.client.Subscriber(pair) as pipe:
while True:
text = await pipe.readline()
if not text:
r = await pipe.pull()
if not r:
break
text = text.strip('\n')
if output == "pb":
event = RecongitionEvent()
event.add_text(text)
event.add_text(r.text)

yield event.to_bytes()
else:
yield json.dumps({ "text" : text })
yield json.dumps({ "text" : r.text })
except Exception as e:
yield json.dumps({ "exception" : str(e)})

Expand Down
3 changes: 3 additions & 0 deletions src/utils/ipc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from . import messages
from . import client
from . import server
124 changes: 124 additions & 0 deletions src/utils/ipc/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import asyncio

if __name__=='__main__':
import messages
else:
from . import messages

HOST, PORT = "localhost", 3015

class Publisher:
def __init__(self, pair):
self._pair: str = pair
self._reader: asyncio.StreamReader = None
self._writer: asyncio.StreamReader = None

def __del__(self):
if self._writer:
self._writer.close()

async def __aenter__(self):
await self._open(self._pair)
return self

async def __aexit__(self, *_):
if self._writer:
self._writer.close()

async def push(self, r: messages.Text):
await messages.send_request(self._writer, r)

async def _open(self, pair: str, timeout: float = 10.0):
async def op() :
self._reader, self._writer = await asyncio.open_connection(HOST, PORT)
await messages.send_request(self._writer, messages.Publish(pair))
ready = await messages.receive_request(self._reader)
if not isinstance(ready, messages.Ready):
raise asyncio.InvalidStateError()

await asyncio.wait_for(op(), timeout)


class Subscriber:
def __init__(self, pair: str):
self._pair: str = pair
self._reader: asyncio.StreamReader = None
self._writer: asyncio.StreamReader = None

def __del__(self):
if self._writer:
self._writer.close()

async def __aenter__(self):
await self._open(self._pair)
return self

async def __aexit__(self, *_):
if self._writer:
self._writer.close()

async def pull(self) -> messages.Text:
try:
r = await messages.receive_request(self._reader)
except asyncio.IncompleteReadError:
return None

if not isinstance(r, messages.Text):
raise asyncio.InvalidStateError()
return r

async def _open(self, pair: str, timeout: float = 10.0):
async def op() :
self._reader, self._writer = await asyncio.open_connection(HOST, PORT)
await messages.send_request(self._writer, messages.Subscribe(pair))
ready = await messages.receive_request(self._reader)
if not isinstance(ready, messages.Ready):
raise asyncio.InvalidStateError()

await asyncio.wait_for(op(), timeout)

if __name__ == '__main__':
async def publisher(pair):
try:
async def op():
async with Publisher(pair) as pub:
for i in range(0, 30):
await pub.push(messages.Text(f"{pair} -> {i}", False))
await asyncio.sleep(1)
await asyncio.wait_for(op(), 3)
except Exception as e:
print(e)
pass

async def subscriber(pair):

try:
async with Subscriber(pair) as sub:
while True:
r = await sub.pull()
if r is None:
break
print(r)

except asyncio.IncompleteReadError:
pass
except Exception as e:
print(e)
pass

async def batch(pair):
try:
await asyncio.gather(subscriber(pair), publisher(pair))
except Exception as e:
print(e)
pass

async def main():
tasks = []
for i in range(20):
tasks.append(asyncio.create_task(batch(str(i))))

for t in tasks:
await t

asyncio.run(main())
33 changes: 33 additions & 0 deletions src/utils/ipc/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio
import msgspec

class Publish(msgspec.Struct, tag = True):
pair: str

class Subscribe(msgspec.Struct, tag = True):
pair: str

class Ready(msgspec.Struct, tag = True):
pass

class Text(msgspec.Struct, tag = True):
text: str
final: bool

Request = Publish | Subscribe | Ready | Text

RequestEncoder = msgspec.msgpack.Encoder()
RequestDecoder = msgspec.msgpack.Decoder(Request)

async def receive_request(reader: asyncio.StreamReader) -> Request:
len = int.from_bytes(await reader.readexactly(4), "big")
if len < 0 or len > 1024:
raise asyncio.InvalidStateError()
data = await reader.readexactly(len)
return RequestDecoder.decode(data)

async def send_request(writer, req):
data = RequestEncoder.encode(req)
writer.write(len(data).to_bytes(4, "big"))
writer.write(data)
await writer.drain()
Loading

0 comments on commit 1b5a219

Please sign in to comment.