From 00ff02990517b9a35d9ee6f63d7ae085d120ae95 Mon Sep 17 00:00:00 2001 From: adnaans Date: Wed, 10 Jul 2024 17:56:47 -0700 Subject: [PATCH 1/7] add streaming conversation to synthesizer and transcriber --- vocode/streaming/streaming_conversation.py | 2 ++ .../streaming/synthesizer/base_synthesizer.py | 18 +++++++++++++++++- .../streaming/transcriber/base_transcriber.py | 6 +++++- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/vocode/streaming/streaming_conversation.py b/vocode/streaming/streaming_conversation.py index f6fc02500..c52c8042e 100644 --- a/vocode/streaming/streaming_conversation.py +++ b/vocode/streaming/streaming_conversation.py @@ -686,9 +686,11 @@ def create_state_manager(self) -> ConversationStateManager: async def start(self, mark_ready: Optional[Callable[[], Awaitable[None]]] = None): self.transcriber.start() + self.transcriber.streaming_conversation = self self.transcriptions_worker.start() self.agent_responses_worker.start() self.synthesis_results_worker.start() + self.synthesizer.streaming_conversation = self self.output_device.start() if self.filler_audio_worker is not None: self.filler_audio_worker.start() diff --git a/vocode/streaming/synthesizer/base_synthesizer.py b/vocode/streaming/synthesizer/base_synthesizer.py index 8ccd6aad2..a4d1478e2 100644 --- a/vocode/streaming/synthesizer/base_synthesizer.py +++ b/vocode/streaming/synthesizer/base_synthesizer.py @@ -4,7 +4,18 @@ import math import os import wave -from typing import Any, AsyncGenerator, Callable, Generic, List, Optional, Tuple, TypeVar, Union +from typing import ( + TYPE_CHECKING, + Any, + AsyncGenerator, + Callable, + Generic, + List, + Optional, + Tuple, + TypeVar, + Union, +) import aiohttp from loguru import logger @@ -24,6 +35,9 @@ from vocode.streaming.utils.create_task import asyncio_create_task from vocode.streaming.utils.worker import QueueConsumer +if TYPE_CHECKING: + from vocode.streaming.streaming_conversation import StreamingConversation + FILLER_PHRASES = [ BaseMessage(text="Um..."), BaseMessage(text="Uh..."), @@ -224,6 +238,8 @@ def create_synthesis_result(self, chunk_size) -> SynthesisResult: class BaseSynthesizer(Generic[SynthesizerConfigType]): + streaming_conversation: "StreamingConversation" + def __init__( self, synthesizer_config: SynthesizerConfigType, diff --git a/vocode/streaming/transcriber/base_transcriber.py b/vocode/streaming/transcriber/base_transcriber.py index beea852b1..782e9dadf 100644 --- a/vocode/streaming/transcriber/base_transcriber.py +++ b/vocode/streaming/transcriber/base_transcriber.py @@ -3,18 +3,22 @@ import asyncio import audioop from abc import ABC, abstractmethod -from typing import Generic, Optional, TypeVar, Union +from typing import TYPE_CHECKING, Generic, Optional, TypeVar, Union from vocode.streaming.models.audio import AudioEncoding from vocode.streaming.models.transcriber import TranscriberConfig, Transcription from vocode.streaming.utils.speed_manager import SpeedManager from vocode.streaming.utils.worker import AbstractWorker, AsyncWorker, ThreadAsyncWorker +if TYPE_CHECKING: + from vocode.streaming.streaming_conversation import StreamingConversation + TranscriberConfigType = TypeVar("TranscriberConfigType", bound=TranscriberConfig) class AbstractTranscriber(Generic[TranscriberConfigType], AbstractWorker[bytes]): consumer: AbstractWorker[Transcription] + streaming_conversation: "StreamingConversation" def __init__(self, transcriber_config: TranscriberConfigType): AbstractWorker.__init__(self) From 786f4046f95fa5f32c0d1251858d371a3bd5654d Mon Sep 17 00:00:00 2001 From: adnaans Date: Wed, 10 Jul 2024 22:51:42 -0700 Subject: [PATCH 2/7] add custom elevenlabs and play ht v1 errors --- .../synthesizer/eleven_labs_synthesizer.py | 8 +++++++- .../synthesizer/play_ht_synthesizer.py | 20 +++++++++++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/vocode/streaming/synthesizer/eleven_labs_synthesizer.py b/vocode/streaming/synthesizer/eleven_labs_synthesizer.py index 1ccd43547..201c80f80 100644 --- a/vocode/streaming/synthesizer/eleven_labs_synthesizer.py +++ b/vocode/streaming/synthesizer/eleven_labs_synthesizer.py @@ -16,6 +16,10 @@ STREAMED_CHUNK_SIZE = 16000 * 2 // 4 # 1/8 of a second of 16kHz audio with 16-bit samples +class ElevenlabsException(Exception): + pass + + class ElevenLabsSynthesizer(BaseSynthesizer[ElevenLabsSynthesizerConfig]): def __init__( self, @@ -145,7 +149,9 @@ async def get_chunks( if not stream.is_success: error = await stream.aread() logger.error(f"ElevenLabs API failed: {stream.status_code} {error.decode('utf-8')}") - raise Exception(f"ElevenLabs API returned {stream.status_code} status code") + raise ElevenlabsException( + f"ElevenLabs API returned {stream.status_code} status code and the following details: {error.decode('utf-8')}" + ) async for chunk in stream.aiter_bytes(chunk_size): if self.upsample: chunk = self._resample_chunk( diff --git a/vocode/streaming/synthesizer/play_ht_synthesizer.py b/vocode/streaming/synthesizer/play_ht_synthesizer.py index 05405af9d..80baae42a 100644 --- a/vocode/streaming/synthesizer/play_ht_synthesizer.py +++ b/vocode/streaming/synthesizer/play_ht_synthesizer.py @@ -12,6 +12,10 @@ TTS_ENDPOINT = "https://play.ht/api/v2/tts/stream" +class PlayHTV1APIError(Exception): + pass + + class PlayHtSynthesizer(BaseSynthesizer[PlayHtSynthesizerConfig]): def __init__( self, @@ -78,14 +82,26 @@ async def create_speech_uncached( timeout=ClientTimeout(total=15), ) if not response.ok: - raise Exception(f"Play.ht API error status code {response.status}") + raise PlayHTV1APIError( + f"Play.ht API error status code {response.status}", + (await response.json()).get( + "error_message", + f"Request to Play.HT failed with status code {str(response.status)}", + ), + ) if response.status == 429 and attempt < max_backoff_retries - 1: await asyncio.sleep(backoff_retry_delay) backoff_retry_delay *= 2 # Exponentially increase delay continue if not response.ok: - raise Exception(f"Play.ht API error status code {response.status}") + raise PlayHTV1APIError( + f"Play.ht API error status code {response.status}", + (await response.json()).get( + "error_message", + f"Request to Play.HT failed with status code {str(response.status)}", + ), + ) if self.experimental_streaming: return SynthesisResult( From 971ac42e0779b08985158f551e9015c82f80d8ae Mon Sep 17 00:00:00 2001 From: adnaans Date: Wed, 10 Jul 2024 23:28:32 -0700 Subject: [PATCH 3/7] remove extra log error for elevenlabs --- vocode/streaming/synthesizer/eleven_labs_synthesizer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/vocode/streaming/synthesizer/eleven_labs_synthesizer.py b/vocode/streaming/synthesizer/eleven_labs_synthesizer.py index 201c80f80..8beb31bf4 100644 --- a/vocode/streaming/synthesizer/eleven_labs_synthesizer.py +++ b/vocode/streaming/synthesizer/eleven_labs_synthesizer.py @@ -148,7 +148,6 @@ async def get_chunks( if not stream.is_success: error = await stream.aread() - logger.error(f"ElevenLabs API failed: {stream.status_code} {error.decode('utf-8')}") raise ElevenlabsException( f"ElevenLabs API returned {stream.status_code} status code and the following details: {error.decode('utf-8')}" ) From 99d472d272be5bf4c8c24251d4ce2131d006a247 Mon Sep 17 00:00:00 2001 From: adnaans Date: Wed, 10 Jul 2024 23:28:46 -0700 Subject: [PATCH 4/7] use custom errors for rime --- vocode/streaming/synthesizer/rime_synthesizer.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/vocode/streaming/synthesizer/rime_synthesizer.py b/vocode/streaming/synthesizer/rime_synthesizer.py index f776da4a8..b00cd872b 100644 --- a/vocode/streaming/synthesizer/rime_synthesizer.py +++ b/vocode/streaming/synthesizer/rime_synthesizer.py @@ -25,6 +25,10 @@ WAV_HEADER_LENGTH = 44 +class RimeError(Exception): + pass + + class RimeSynthesizer(BaseSynthesizer[RimeSynthesizerConfig]): def __init__( self, @@ -73,7 +77,7 @@ async def create_speech_uncached( timeout=aiohttp.ClientTimeout(total=15), ) as response: if not response.ok: - raise Exception(f"Rime API error: {response.status}, {await response.text()}") + raise RimeError(f"Rime API error: {response.status}, {await response.text()}") data = json.loads(await response.text()) audio_content = data.get("audioContent") @@ -117,8 +121,9 @@ async def get_chunks( ) if not stream.is_success: error = await stream.aread() - logger.error(f"Rime API failed: {stream.status_code} {error.decode('utf-8')}") - raise Exception(f"Rime API returned {stream.status_code} status code") + raise RimeError( + f"Rime API returned {stream.status_code} status code with the following details: {error.decode('utf-8')}" + ) async for chunk in stream.aiter_bytes(chunk_size): chunk_queue.put_nowait(chunk) except asyncio.CancelledError: From bf2fb439372a5a6c7f7a0c115b66d0007059ec3d Mon Sep 17 00:00:00 2001 From: adnaans Date: Fri, 26 Jul 2024 15:43:36 -0700 Subject: [PATCH 5/7] add custom errors for azure --- vocode/streaming/synthesizer/azure_synthesizer.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/vocode/streaming/synthesizer/azure_synthesizer.py b/vocode/streaming/synthesizer/azure_synthesizer.py index eee831e29..11c4c0364 100644 --- a/vocode/streaming/synthesizer/azure_synthesizer.py +++ b/vocode/streaming/synthesizer/azure_synthesizer.py @@ -32,6 +32,10 @@ _AZURE_INSIDE_VOICE_REGEX = r"]*>(.*?)<\/voice>" +class AzureSynthesizerException(Exception): + pass + + class WordBoundaryEventPool: def __init__(self): self.events = [] @@ -231,6 +235,15 @@ def get_message_up_to( return ssml_fragment.split(">")[-1] return message + async def _check_stream_for_errors(self, audio_data_stream: speechsdk.AudioDataStream): + if ( + audio_data_stream.cancellation_details + and audio_data_stream.cancellation_details.reason == speechsdk.CancellationReason.Error + ): + raise AzureSynthesizerException( + f"Azure Synthesizer Error: {audio_data_stream.cancellation_details.error_details}" + ) + async def create_speech_uncached( self, message: BaseMessage, @@ -259,6 +272,8 @@ async def chunk_generator( self.thread_pool_executor, lambda: audio_data_stream.read_data(audio_buffer), ) + + await self._check_stream_for_errors(audio_data_stream) if filled_size != chunk_size: yield SynthesisResult.ChunkResult(chunk_transform(audio_buffer[offset:]), True) return From 720eff688643910895f81c4dec15cea26ebbef2a Mon Sep 17 00:00:00 2001 From: adnaans Date: Mon, 29 Jul 2024 11:53:43 -0700 Subject: [PATCH 6/7] add failsafe for no json in playht response --- .../synthesizer/play_ht_synthesizer.py | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/vocode/streaming/synthesizer/play_ht_synthesizer.py b/vocode/streaming/synthesizer/play_ht_synthesizer.py index 80baae42a..f80f2fcd6 100644 --- a/vocode/streaming/synthesizer/play_ht_synthesizer.py +++ b/vocode/streaming/synthesizer/play_ht_synthesizer.py @@ -82,27 +82,21 @@ async def create_speech_uncached( timeout=ClientTimeout(total=15), ) if not response.ok: + response_json = await response.json() + if response_json and "error_message" in response_json: + message = response_json["error_message"] + else: + message = f"Request to Play.HT failed with status code {str(response.status)}" + raise PlayHTV1APIError( f"Play.ht API error status code {response.status}", - (await response.json()).get( - "error_message", - f"Request to Play.HT failed with status code {str(response.status)}", - ), + message, ) if response.status == 429 and attempt < max_backoff_retries - 1: await asyncio.sleep(backoff_retry_delay) backoff_retry_delay *= 2 # Exponentially increase delay continue - if not response.ok: - raise PlayHTV1APIError( - f"Play.ht API error status code {response.status}", - (await response.json()).get( - "error_message", - f"Request to Play.HT failed with status code {str(response.status)}", - ), - ) - if self.experimental_streaming: return SynthesisResult( self.experimental_mp3_streaming_output_generator( From 290e5f590bbc90ef9eabd8dbcae22b016709ec79 Mon Sep 17 00:00:00 2001 From: adnaans Date: Mon, 29 Jul 2024 12:07:13 -0700 Subject: [PATCH 7/7] rename error message var --- vocode/streaming/synthesizer/play_ht_synthesizer.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/vocode/streaming/synthesizer/play_ht_synthesizer.py b/vocode/streaming/synthesizer/play_ht_synthesizer.py index f80f2fcd6..46c361bc4 100644 --- a/vocode/streaming/synthesizer/play_ht_synthesizer.py +++ b/vocode/streaming/synthesizer/play_ht_synthesizer.py @@ -84,13 +84,15 @@ async def create_speech_uncached( if not response.ok: response_json = await response.json() if response_json and "error_message" in response_json: - message = response_json["error_message"] + error_message = response_json["error_message"] else: - message = f"Request to Play.HT failed with status code {str(response.status)}" + error_message = ( + f"Request to Play.HT failed with status code {str(response.status)}" + ) raise PlayHTV1APIError( f"Play.ht API error status code {response.status}", - message, + error_message, ) if response.status == 429 and attempt < max_backoff_retries - 1: await asyncio.sleep(backoff_retry_delay)