diff --git a/README.md b/README.md index 14a869f..a54d7cf 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,3 @@ # stt:git [![pypi_status](https://img.shields.io/badge/BentoML-1.1.10-informational)](https://pypi.org/project/BentoML) -[![documentation_status](https://readthedocs.org/projects/bentoml/badge/?version=latest)](https://docs.bentoml.com/) -[![join_slack](https://badgen.net/badge/Join/BentoML%20Slack/cyan?icon=slack)](https://l.bentoml.com/join-slack-swagger) -[![BentoML GitHub Repo](https://img.shields.io/github/stars/bentoml/bentoml?style=social)](https://github.com/bentoml/BentoML) -[![Twitter Follow](https://img.shields.io/twitter/follow/bentomlai?label=Follow%20BentoML&style=social)](https://twitter.com/bentomlai) - -This is a Machine Learning Service created with BentoML. -| InferenceAPI | Input | Output | -| ------------ | ----- | ------ | -| POST [`/process_audio`](#operations-Service_APIs-stt__process_audio) | BytesIOFile | JSON | - - - - -## Help - -* [📖 Documentation](https://docs.bentoml.com/en/latest/): Learn how to use BentoML. -* [💬 Community](https://l.bentoml.com/join-slack-swagger): Join the BentoML Slack community. -* [🐛 GitHub Issues](https://github.com/bentoml/BentoML/issues): Report bugs and feature requests. -* Tip: you can also [customize this README](https://docs.bentoml.com/en/latest/concepts/bento.html#description). diff --git a/apis/openapi.yaml b/apis/openapi.yaml index 6fdf51e..3461303 100644 --- a/apis/openapi.yaml +++ b/apis/openapi.yaml @@ -46,31 +46,11 @@ info: contact: email: contact@bentoml.com name: BentoML Team - description: "# stt:None\n\n[![pypi_status](https://img.shields.io/badge/BentoML-1.1.10-informational)](https://pypi.org/project/BentoML)\n\ - [![documentation_status](https://readthedocs.org/projects/bentoml/badge/?version=latest)](https://docs.bentoml.com/)\n\ - [![join_slack](https://badgen.net/badge/Join/BentoML%20Slack/cyan?icon=slack)](https://l.bentoml.com/join-slack-swagger)\n\ - [![BentoML GitHub Repo](https://img.shields.io/github/stars/bentoml/bentoml?style=social)](https://github.com/bentoml/BentoML)\n\ - [![Twitter Follow](https://img.shields.io/twitter/follow/bentomlai?label=Follow%20BentoML&style=social)](https://twitter.com/bentomlai)\n\ - \nThis is a Machine Learning Service created with BentoML.\n| InferenceAPI | Input\ - \ | Output |\n| ------------ | ----- | ------ |\n| POST [`/process_audio`](#operations-Service_APIs-stt__process_audio)\ - \ | BytesIOFile | JSON |\n\n\n\n\n## Help\n\n* [\U0001F4D6 Documentation](https://docs.bentoml.com/en/latest/):\ - \ Learn how to use BentoML.\n* [\U0001F4AC Community](https://l.bentoml.com/join-slack-swagger):\ - \ Join the BentoML Slack community.\n* [\U0001F41B GitHub Issues](https://github.com/bentoml/BentoML/issues):\ - \ Report bugs and feature requests.\n* Tip: you can also [customize this README](https://docs.bentoml.com/en/latest/concepts/bento.html#description).\n" + description: "# stt:None\n\n[![pypi_status](https://img.shields.io/badge/BentoML-1.1.10-informational)](https://pypi.org/project/BentoML)\n title: stt version: None openapi: 3.0.2 paths: - /healthz: - get: - description: Health check endpoint. Expecting an empty response with status - code 200 when the service is in health state. The /healthz - endpoint is deprecated. (since Kubernetes v1.16) - responses: - '200': - description: Successful Response - tags: - - Infrastructure /livez: get: description: Health check endpoint for Kubernetes. Healthy endpoint responses diff --git a/bento.yaml b/bento.yaml index 7d4f523..c9b7107 100644 --- a/bento.yaml +++ b/bento.yaml @@ -18,7 +18,7 @@ apis: docker: distro: debian python_version: '3.11' - cuda_version: 11.6.2 + cuda_version: 12.4.1 env: BENTOML_CONFIG: src/configuration.yaml system_packages: null diff --git a/bentofile.yaml b/bentofile.yaml index 646ebee..0a0ad04 100644 --- a/bentofile.yaml +++ b/bentofile.yaml @@ -11,6 +11,6 @@ include: python: requirements_txt: "requirements.txt" docker: - cuda_version: "11.6.2" + cuda_version: "12.4.1" env: BENTOML_CONFIG: "src/configuration.yaml" diff --git a/env/docker/Dockerfile b/env/docker/Dockerfile index 0e06669..4dccf5f 100644 --- a/env/docker/Dockerfile +++ b/env/docker/Dockerfile @@ -5,7 +5,7 @@ # =========================================== # Block SETUP_BENTO_BASE_IMAGE -FROM nvidia/cuda:11.2.2-cudnn8-runtime-ubuntu20.04 as base-container +FROM nvidia/cuda:12.4.1-devel-ubuntu22.04 as base-container ENV LANG=C.UTF-8 @@ -27,22 +27,6 @@ RUN set -eux && \ apt-get install -q -y --no-install-recommends --allow-remove-essential \ ca-certificates gnupg2 bash build-essential -RUN \ - set -eux && \ - apt-get install -y --no-install-recommends --allow-remove-essential software-properties-common && \ - # add deadsnakes ppa to install python - add-apt-repository ppa:deadsnakes/ppa && \ - apt-get update -y && \ - apt-get install -y --no-install-recommends --allow-remove-essential curl python3.11 python3.11-dev python3.11-distutils - -RUN ln -sf /usr/bin/python3.11 /usr/bin/python3 && \ - ln -sf /usr/bin/pip3.11 /usr/bin/pip3 - -RUN curl -O https://bootstrap.pypa.io/get-pip.py && \ - python3 get-pip.py && \ - rm -rf get-pip.py - - # Block SETUP_BENTO_USER ARG BENTO_USER=bentoml ARG BENTO_USER_UID=1034 @@ -53,12 +37,11 @@ ENV BENTOML_CONFIG=src/configuration.yaml ARG BENTO_PATH=/home/bentoml/bento ENV BENTO_PATH=$BENTO_PATH ENV BENTOML_HOME=/home/bentoml/ +ENV BENTOML_DO_NOT_TRACK=True RUN mkdir $BENTO_PATH && chown bentoml:bentoml $BENTO_PATH -R WORKDIR $BENTO_PATH - - # Block SETUP_BENTO_COMPONENTS COPY --chown=bentoml:bentoml ./env/python ./env/python/ # install python packages with install.sh diff --git a/env/python/install.sh b/env/python/install.sh index 9be19f2..b2b512f 100644 --- a/env/python/install.sh +++ b/env/python/install.sh @@ -16,18 +16,18 @@ BENTOML_VERSION=${BENTOML_VERSION:-1.1.10} # Install python packages, prefer installing the requirements.lock.txt file if it exist if [ -f "$REQUIREMENTS_LOCK" ]; then echo "Installing pip packages from 'requirements.lock.txt'.." - pip3 install -r "$REQUIREMENTS_LOCK" "${PIP_ARGS[@]}" + pip install -r "$REQUIREMENTS_LOCK" "${PIP_ARGS[@]}" else if [ -f "$REQUIREMENTS_TXT" ]; then echo "Installing pip packages from 'requirements.txt'.." - pip3 install -r "$REQUIREMENTS_TXT" "${PIP_ARGS[@]}" + pip install -r "$REQUIREMENTS_TXT" "${PIP_ARGS[@]}" fi fi # Install user-provided wheels if [ -d "$WHEELS_DIR" ]; then echo "Installing wheels packaged in Bento.." - pip3 install "$WHEELS_DIR"/*.whl "${PIP_ARGS[@]}" + pip install "$WHEELS_DIR"/*.whl "${PIP_ARGS[@]}" fi # Install the BentoML from PyPI if it's not already installed @@ -37,5 +37,5 @@ if python3 -c "import bentoml" &> /dev/null; then echo "WARNING: using BentoML version ${existing_bentoml_version}" fi else - pip3 install bentoml=="$BENTOML_VERSION" + pip install bentoml=="$BENTOML_VERSION" fi diff --git a/src/requirements.txt b/src/requirements.txt index 8ea0381..9071a74 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -1,5 +1,5 @@ -ctranslate2==3.24.0 -faster_whisper==0.10.1 +ctranslate2 +faster_whisper fastapi aiofiles asyncio \ No newline at end of file diff --git a/src/stt_api.py b/src/stt_api.py index b200917..961a6c8 100644 --- a/src/stt_api.py +++ b/src/stt_api.py @@ -3,7 +3,7 @@ from fastapi.encoders import jsonable_encoder import utils.google_streaming.google_streaming_api_pb2 as speech import bentoml -from utils.npipe import AsyncNamedPipe +from utils.npipe import AsyncChannelWriter, AsyncChannelReader import io from runners.audio_transcriber import AudioTranscriber import json @@ -30,14 +30,11 @@ def to_bytes(self): app = FastAPI() -clients = {} - @app.post("/up") async def handleUpstream(pair: str, request: Request): try: mic_data = bytes() - - async with await AsyncNamedPipe.create(pair) as pipe: + async with await AsyncChannelWriter.open(pair) as pipe: async for chunk in request.stream(): if len(chunk) == 0: break @@ -51,10 +48,10 @@ async def handleUpstream(pair: str, request: Request): return JSONResponse(content = jsonable_encoder({ "status" : "ok" })) @app.get("/down") -async def handleDownstream(pair: str, output: str = "pb"): +async def handleDownstream(pair: str, request: Request, output: str = "pb"): async def handleStream(pair): try: - async with await AsyncNamedPipe.open(pair) as pipe: + async with await AsyncChannelReader.open(pair) as pipe: while True: text = await pipe.readline() if not text: diff --git a/src/utils/npipe/__init__.py b/src/utils/npipe/__init__.py index f0c65d0..caaa808 100644 --- a/src/utils/npipe/__init__.py +++ b/src/utils/npipe/__init__.py @@ -1,6 +1 @@ -from os import name as _os_name - -if _os_name == "nt": - from ._win32 import AsyncNamedPipe -else: - from ._posix import AsyncNamedPipe \ No newline at end of file +from ._posix import AsyncChannelWriter, AsyncChannelReader \ No newline at end of file diff --git a/src/utils/npipe/_posix.py b/src/utils/npipe/_posix.py index 33a922d..1b7f06c 100644 --- a/src/utils/npipe/_posix.py +++ b/src/utils/npipe/_posix.py @@ -1,71 +1,92 @@ import os +import tempfile import asyncio import aiofiles import aiofiles.os -from contextlib import contextmanager +from pathlib import Path -class AsyncNamedPipe: - def __init__(self, pipe, path, creator = False): - self._pipe = pipe - self._path = path - self._creator = creator - async def create(path: str, timeout: int = 10000): +class AsyncChannelWriter: + def __init__(self, fd, dir: Path): + self._fd = fd + self._dir = dir + + async def open(pair: str, timeout: int = 10000): loop = asyncio.get_running_loop() - path = f"/tmp/{path}" - try: - await loop.run_in_executor(None, os.mkfifo, path) - except: - pass - + try: + dir = await loop.run_in_executor(None, tempfile.mkdtemp, None, pair + "-", Path.home() / "tmp" / "channels") + dir = Path(dir) + await loop.run_in_executor(None, os.mkfifo, dir / "pipe") async with asyncio.timeout(timeout / 1000.0): - fd = await aiofiles.open(path, "w") - return AsyncNamedPipe(fd, path, True) + fd = await aiofiles.open(dir / "pipe", "w") + return AsyncChannelWriter(fd, dir) except asyncio.TimeoutError: - await aiofiles.os.unlink(path) + await aiofiles.os.unlink(dir / "pipe") + await aiofiles.os.rmdir(dir) raise Exception("No consumer") - async def open(path: str, timeout: int = 1000): - pipe = None - path = f"/tmp/{path}" - try: - async with asyncio.timeout(timeout / 1000.0): - while not await aiofiles.ospath.exists(path): - await asyncio.sleep(0.1) - pipe = await aiofiles.open(path, "r") - except asyncio.TimeoutError: - raise Exception("No producer") - - return AsyncNamedPipe(pipe, path) + async def close(self): + await self._fd.close() + await aiofiles.os.unlink(self._dir / "pipe") + await aiofiles.os.rmdir(self._dir) async def __aenter__(self): return self async def __aexit__(self, *_): - return await self.close() + await self.close() def __bool__(self): - return self._pipe is not None + return self._fd is not None async def write(self, data): - await self._pipe.write(data) - await self._pipe.flush() + await self._fd.write(data) + await self._fd.flush() async def writeline(self, data): - await self._pipe.writelines([data]) - await self._pipe.flush() + await self._fd.writelines([data]) + await self._fd.flush() - async def read(self, size = -1): - return await self._pipe.read(size) - async def readline(self): - return await self._pipe.readline() +class AsyncChannelReader: + def __init__(self, fd): + self._fd = fd + async def open(pair: str, timeout: int = 10000): + fd = None + try: + async with asyncio.timeout(timeout / 1000.0): + channels = Path.home() / "tmp" / "channels" + dirs = None + while not dirs: + dirs = list(filter(lambda d: d.startswith(f"{pair}-"), await aiofiles.os.listdir(channels))) + if not dir: + await asyncio.sleep(0.1) + + path = channels / dirs[0] / "pipe" + while not await aiofiles.ospath.exists(path): + await asyncio.sleep(0.1) + + fd = await aiofiles.open(path, "r") + except asyncio.TimeoutError: + raise Exception("No producer") + + return AsyncChannelReader(fd) async def close(self): - if self._creator: - await aiofiles.os.unlink(self._path) + await self._fd.close() + + async def __aenter__(self): + return self + + async def __aexit__(self, *_): + await self.close() + + async def read(self, size = -1): + return await self._fd.read(size) + + async def readline(self): + return await self._fd.readline() - return await self._pipe.close() diff --git a/src/utils/npipe/_win32.py b/src/utils/npipe/_win32.py deleted file mode 100644 index 2b19d9b..0000000 --- a/src/utils/npipe/_win32.py +++ /dev/null @@ -1,64 +0,0 @@ -import asyncio -import win32pipe -import win32file -import pywintypes -import win32event - -class AsyncNamedPipe: - def __init__(self): - self._pipe = None - self._overlapped = pywintypes.OVERLAPPED() - - async def create(self, path): - self._pipe = win32pipe.CreateNamedPipe( - rf"\\.\pipe\{path}", - win32pipe.PIPE_ACCESS_DUPLEX | win32file.FILE_FLAG_OVERLAPPED, - win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT, - 1, 65536, 65536, - 0, - None) - self._overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None) - win32pipe.ConnectNamedPipe(self._pipe, self._overlapped) - await self._wait_for_overlapped(self._overlapped) - - async def open(self, path): - self._overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None) - self._pipe = win32file.CreateFile( - rf"\\.\pipe\{path}", - win32file.GENERIC_READ | win32file.GENERIC_WRITE, - 0, - None, - win32file.OPEN_EXISTING, - win32file.FILE_FLAG_OVERLAPPED, - None) - await self._wait_for_overlapped(self._overlapped) - - async def write(self, data): - overlapped = pywintypes.OVERLAPPED() - overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None) - win32file.WriteFile(self._pipe, data, overlapped) - await self._wait_for_overlapped(overlapped) - - async def read(self, timeout=1000): - overlapped = pywintypes.OVERLAPPED() - overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None) - buffer = win32file.AllocateReadBuffer(65536) - win32file.ReadFile(self._pipe, buffer, overlapped) - await self._wait_for_overlapped(overlapped, timeout) - n_bytes = win32file.GetOverlappedResult(self._pipe, overlapped, True) - return bytes(buffer[:n_bytes]) - - async def close(self): - win32file.CloseHandle(self._pipe) - self._pipe = None - - async def _wait_for_overlapped(self, overlapped, timeout=1000): - loop = asyncio.get_running_loop() - return await loop.run_in_executor(None, self._wait_for_overlapped_blocking, overlapped, timeout) - - def _wait_for_overlapped_blocking(self, overlapped, timeout): - result = win32event.WaitForSingleObject(overlapped.hEvent, timeout) - if result == win32event.WAIT_OBJECT_0: - _ = win32file.GetOverlappedResult(self._pipe, overlapped, True) - return True - return False \ No newline at end of file