diff --git a/vocode/streaming/streaming_conversation.py b/vocode/streaming/streaming_conversation.py index bb1327080..dffb15420 100644 --- a/vocode/streaming/streaming_conversation.py +++ b/vocode/streaming/streaming_conversation.py @@ -687,9 +687,11 @@ def create_state_manager(self) -> ConversationStateManager: async def start(self, mark_ready: Optional[Callable[[], Awaitable[None]]] = None): self.transcriber.start() + self.transcriber.streaming_conversation = self self.transcriptions_worker.start() self.agent_responses_worker.start() self.synthesis_results_worker.start() + self.synthesizer.streaming_conversation = self self.output_device.start() if self.filler_audio_worker is not None: self.filler_audio_worker.start() diff --git a/vocode/streaming/synthesizer/azure_synthesizer.py b/vocode/streaming/synthesizer/azure_synthesizer.py index eee831e29..11c4c0364 100644 --- a/vocode/streaming/synthesizer/azure_synthesizer.py +++ b/vocode/streaming/synthesizer/azure_synthesizer.py @@ -32,6 +32,10 @@ _AZURE_INSIDE_VOICE_REGEX = r"]*>(.*?)<\/voice>" +class AzureSynthesizerException(Exception): + pass + + class WordBoundaryEventPool: def __init__(self): self.events = [] @@ -231,6 +235,15 @@ def get_message_up_to( return ssml_fragment.split(">")[-1] return message + async def _check_stream_for_errors(self, audio_data_stream: speechsdk.AudioDataStream): + if ( + audio_data_stream.cancellation_details + and audio_data_stream.cancellation_details.reason == speechsdk.CancellationReason.Error + ): + raise AzureSynthesizerException( + f"Azure Synthesizer Error: {audio_data_stream.cancellation_details.error_details}" + ) + async def create_speech_uncached( self, message: BaseMessage, @@ -259,6 +272,8 @@ async def chunk_generator( self.thread_pool_executor, lambda: audio_data_stream.read_data(audio_buffer), ) + + await self._check_stream_for_errors(audio_data_stream) if filled_size != chunk_size: yield SynthesisResult.ChunkResult(chunk_transform(audio_buffer[offset:]), True) return diff --git a/vocode/streaming/synthesizer/base_synthesizer.py b/vocode/streaming/synthesizer/base_synthesizer.py index f6e70aca0..b41c30db7 100644 --- a/vocode/streaming/synthesizer/base_synthesizer.py +++ b/vocode/streaming/synthesizer/base_synthesizer.py @@ -4,7 +4,18 @@ import math import os import wave -from typing import Any, AsyncGenerator, Callable, Generic, List, Optional, Tuple, TypeVar, Union +from typing import ( + TYPE_CHECKING, + Any, + AsyncGenerator, + Callable, + Generic, + List, + Optional, + Tuple, + TypeVar, + Union, +) import aiohttp from loguru import logger @@ -24,6 +35,9 @@ from vocode.streaming.utils.create_task import asyncio_create_task from vocode.streaming.utils.worker import QueueConsumer +if TYPE_CHECKING: + from vocode.streaming.streaming_conversation import StreamingConversation + FILLER_PHRASES = [ BaseMessage(text="Um..."), BaseMessage(text="Uh..."), @@ -224,6 +238,8 @@ def create_synthesis_result(self, chunk_size) -> SynthesisResult: class BaseSynthesizer(Generic[SynthesizerConfigType]): + streaming_conversation: "StreamingConversation" + def __init__( self, synthesizer_config: SynthesizerConfigType, diff --git a/vocode/streaming/synthesizer/eleven_labs_synthesizer.py b/vocode/streaming/synthesizer/eleven_labs_synthesizer.py index 1ccd43547..8beb31bf4 100644 --- a/vocode/streaming/synthesizer/eleven_labs_synthesizer.py +++ b/vocode/streaming/synthesizer/eleven_labs_synthesizer.py @@ -16,6 +16,10 @@ STREAMED_CHUNK_SIZE = 16000 * 2 // 4 # 1/8 of a second of 16kHz audio with 16-bit samples +class ElevenlabsException(Exception): + pass + + class ElevenLabsSynthesizer(BaseSynthesizer[ElevenLabsSynthesizerConfig]): def __init__( self, @@ -144,8 +148,9 @@ async def get_chunks( if not stream.is_success: error = await stream.aread() - logger.error(f"ElevenLabs API failed: {stream.status_code} {error.decode('utf-8')}") - raise Exception(f"ElevenLabs API returned {stream.status_code} status code") + raise ElevenlabsException( + f"ElevenLabs API returned {stream.status_code} status code and the following details: {error.decode('utf-8')}" + ) async for chunk in stream.aiter_bytes(chunk_size): if self.upsample: chunk = self._resample_chunk( diff --git a/vocode/streaming/synthesizer/play_ht_synthesizer.py b/vocode/streaming/synthesizer/play_ht_synthesizer.py index 05405af9d..46c361bc4 100644 --- a/vocode/streaming/synthesizer/play_ht_synthesizer.py +++ b/vocode/streaming/synthesizer/play_ht_synthesizer.py @@ -12,6 +12,10 @@ TTS_ENDPOINT = "https://play.ht/api/v2/tts/stream" +class PlayHTV1APIError(Exception): + pass + + class PlayHtSynthesizer(BaseSynthesizer[PlayHtSynthesizerConfig]): def __init__( self, @@ -78,15 +82,23 @@ async def create_speech_uncached( timeout=ClientTimeout(total=15), ) if not response.ok: - raise Exception(f"Play.ht API error status code {response.status}") + response_json = await response.json() + if response_json and "error_message" in response_json: + error_message = response_json["error_message"] + else: + error_message = ( + f"Request to Play.HT failed with status code {str(response.status)}" + ) + + raise PlayHTV1APIError( + f"Play.ht API error status code {response.status}", + error_message, + ) if response.status == 429 and attempt < max_backoff_retries - 1: await asyncio.sleep(backoff_retry_delay) backoff_retry_delay *= 2 # Exponentially increase delay continue - if not response.ok: - raise Exception(f"Play.ht API error status code {response.status}") - if self.experimental_streaming: return SynthesisResult( self.experimental_mp3_streaming_output_generator( diff --git a/vocode/streaming/synthesizer/rime_synthesizer.py b/vocode/streaming/synthesizer/rime_synthesizer.py index f776da4a8..b00cd872b 100644 --- a/vocode/streaming/synthesizer/rime_synthesizer.py +++ b/vocode/streaming/synthesizer/rime_synthesizer.py @@ -25,6 +25,10 @@ WAV_HEADER_LENGTH = 44 +class RimeError(Exception): + pass + + class RimeSynthesizer(BaseSynthesizer[RimeSynthesizerConfig]): def __init__( self, @@ -73,7 +77,7 @@ async def create_speech_uncached( timeout=aiohttp.ClientTimeout(total=15), ) as response: if not response.ok: - raise Exception(f"Rime API error: {response.status}, {await response.text()}") + raise RimeError(f"Rime API error: {response.status}, {await response.text()}") data = json.loads(await response.text()) audio_content = data.get("audioContent") @@ -117,8 +121,9 @@ async def get_chunks( ) if not stream.is_success: error = await stream.aread() - logger.error(f"Rime API failed: {stream.status_code} {error.decode('utf-8')}") - raise Exception(f"Rime API returned {stream.status_code} status code") + raise RimeError( + f"Rime API returned {stream.status_code} status code with the following details: {error.decode('utf-8')}" + ) async for chunk in stream.aiter_bytes(chunk_size): chunk_queue.put_nowait(chunk) except asyncio.CancelledError: diff --git a/vocode/streaming/transcriber/base_transcriber.py b/vocode/streaming/transcriber/base_transcriber.py index edd1009d4..2f05b2844 100644 --- a/vocode/streaming/transcriber/base_transcriber.py +++ b/vocode/streaming/transcriber/base_transcriber.py @@ -3,18 +3,22 @@ import asyncio import audioop from abc import ABC, abstractmethod -from typing import Generic, Optional, TypeVar, Union +from typing import TYPE_CHECKING, Generic, Optional, TypeVar, Union from vocode.streaming.models.audio import AudioEncoding from vocode.streaming.models.transcriber import TranscriberConfig, Transcription from vocode.streaming.utils.speed_manager import SpeedManager from vocode.streaming.utils.worker import AbstractWorker, AsyncWorker, ThreadAsyncWorker +if TYPE_CHECKING: + from vocode.streaming.streaming_conversation import StreamingConversation + TranscriberConfigType = TypeVar("TranscriberConfigType", bound=TranscriberConfig) class AbstractTranscriber(Generic[TranscriberConfigType], AbstractWorker[bytes]): consumer: AbstractWorker[Transcription] + streaming_conversation: "StreamingConversation" def __init__(self, transcriber_config: TranscriberConfigType): AbstractWorker.__init__(self)