-
Notifications
You must be signed in to change notification settings - Fork 493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[EPD-333] ElevenLabs Threaded MP3 Streaming #316
Conversation
EPD-333 Support ElevenLabs Streaming
This may (significantly?) improve latency https://docs.elevenlabs.io/api-reference/text-to-speech-stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the client session PR is merged too, so you can terminate the worker in synthesizer.tear_down()
@@ -94,7 +94,7 @@ | |||
|
|||
|
|||
# These synthesizers stream output so they need to be traced within this file. | |||
STREAMING_SYNTHESIZERS = ["azure"] | |||
STREAMING_SYNTHESIZERS = ["azure", "elevenlabs"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
niceeee
tests/synthesizer/conftest.py
Outdated
@@ -38,23 +40,23 @@ def mock_eleven_labs_api(): | |||
|
|||
|
|||
@pytest.fixture(scope="module") | |||
def eleven_labs_synthesizer_with_api_key(): | |||
async def eleven_labs_synthesizer_with_api_key(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we rename all of these to _fixture
- then it makes more sense that we'd want to await it
vocode/streaming/utils/worker.py
Outdated
@@ -249,3 +255,37 @@ class InterruptibleAgentResponseWorker( | |||
InterruptibleWorker[InterruptibleAgentResponseEvent] | |||
): | |||
pass | |||
|
|||
class PydubWorker(ThreadAsyncWorker): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like this would be better inside of the synthesizer
directory
vocode/streaming/utils/worker.py
Outdated
@@ -84,7 +90,7 @@ def _run_loop(self): | |||
def terminate(self): | |||
return super().terminate() | |||
|
|||
|
|||
# ThreadedAsyncWorker with a run loop that exposes something |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unclear comment
@@ -44,6 +47,14 @@ def __init__( | |||
self.optimize_streaming_latency = synthesizer_config.optimize_streaming_latency | |||
self.words_per_minute = 150 | |||
|
|||
# Create a PydubWorker instance as an attribute | |||
self.pydub_worker = PydubWorker( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we're breaking on is_last
in the worker, shouldn't we make a new PydubWorker
on each create_speech
call?
synthesizer_config, asyncio.Queue(), asyncio.Queue() | ||
) | ||
# Start the PydubWorker and store the task | ||
self.pydub_worker_task = self.pydub_worker.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't need to store this task - worker.terminate
should kill it for you
|
||
return result | ||
|
||
session = aiohttp.ClientSession() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can use self.aiohttp_session
once my PR is done
|
||
return SynthesisResult( | ||
output_generator(response, session), # should be wav | ||
lambda _: "", # useless for now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have some estimation code for this: https://github.com/vocodedev/vocode-python/blob/4ce40fcfae6543d904f9b78b81f32aa77b3d9df0/vocode/streaming/synthesizer/base_synthesizer.py#L182-L188
let's use this for now and set some reasonable WPM
@@ -0,0 +1,41 @@ | |||
import miniaudio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit filename (lowercase miniaudio)
vocode/streaming/utils/mp3_helper.py
Outdated
mp3_chunk = io.BytesIO(mp3_bytes) | ||
|
||
# Convert it to a wav chunk using miniaudio | ||
wav_chunk = miniaudio.decode(mp3_chunk.read(), nchannels=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how come we need to make an mp3_chunk
in memory file if we're just going to read
from it?
@@ -104,9 +104,10 @@ class ElevenLabsSynthesizerConfig( | |||
): | |||
api_key: Optional[str] = None | |||
voice_id: Optional[str] = ELEVEN_LABS_ADAM_VOICE_ID | |||
optimize_streaming_latency: Optional[int] = 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we adding a default value here?
* initial work, still blocking * Add threaded mp3 worker * fix tests add todo * use miniaudio worker and fix sync issues * attempt to handle error from short chunks? * make streaming optional & refac miniaudio decoding * teardown the experimental worker * fix tests and mypy * resolve comments * use sentinel to fix /stream endpoint * potentially fix typing * Revert "potentially fix typing" This reverts commit fdd37d7. * forgot about __future__ * fix termination code --------- Co-authored-by: Ajay Raj <[email protected]>
how many latency do you win ? |
This PR adds the ability to stream audio from the ElevenLabs synthesizer API. It uses a threaded worker to convert the mp3 chunks from the API to wav chunks that can be consumed by the vocode streaming pipeline.