From 3e8bf28013ee6ef62e4dfa875f21ae784c6771f3 Mon Sep 17 00:00:00 2001 From: adnaans Date: Wed, 3 Jul 2024 14:10:34 -0700 Subject: [PATCH 1/2] remove error log from exception for asyncio tasks --- .../output_device/twilio_output_device.py | 4 +-- .../output_device/vonage_output_device.py | 4 +-- .../output_device/websocket_output_device.py | 4 +-- vocode/streaming/streaming_conversation.py | 10 +++--- .../streaming/synthesizer/base_synthesizer.py | 4 +-- .../synthesizer/cartesia_synthesizer.py | 31 +++++++------------ .../synthesizer/eleven_labs_synthesizer.py | 4 +-- .../synthesizer/play_ht_synthesizer_v2.py | 4 +-- .../conversation/twilio_phone_conversation.py | 4 +-- vocode/streaming/utils/create_task.py | 15 +-------- vocode/streaming/utils/worker.py | 8 ++--- 11 files changed, 36 insertions(+), 56 deletions(-) diff --git a/vocode/streaming/output_device/twilio_output_device.py b/vocode/streaming/output_device/twilio_output_device.py index 565ec65f1..e9e115e79 100644 --- a/vocode/streaming/output_device/twilio_output_device.py +++ b/vocode/streaming/output_device/twilio_output_device.py @@ -10,7 +10,7 @@ from vocode.streaming.output_device.base_output_device import BaseOutputDevice from vocode.streaming.telephony.constants import DEFAULT_AUDIO_ENCODING, DEFAULT_SAMPLING_RATE -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task class TwilioOutputDevice(BaseOutputDevice): @@ -20,7 +20,7 @@ def __init__(self, ws: Optional[WebSocket] = None, stream_sid: Optional[str] = N self.stream_sid = stream_sid self.active = True self.queue: asyncio.Queue[str] = asyncio.Queue() - self.process_task = asyncio_create_task_with_done_error_log(self.process()) + self.process_task = asyncio_create_task(self.process()) async def process(self): while self.active: diff --git a/vocode/streaming/output_device/vonage_output_device.py b/vocode/streaming/output_device/vonage_output_device.py index 76aef2b12..c77071cc7 100644 --- a/vocode/streaming/output_device/vonage_output_device.py +++ b/vocode/streaming/output_device/vonage_output_device.py @@ -12,7 +12,7 @@ VONAGE_CHUNK_SIZE, VONAGE_SAMPLING_RATE, ) -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task class VonageOutputDevice(BaseOutputDevice): @@ -25,7 +25,7 @@ def __init__( self.ws = ws self.active = True self.queue: asyncio.Queue[bytes] = asyncio.Queue() - self.process_task = asyncio_create_task_with_done_error_log(self.process()) + self.process_task = asyncio_create_task(self.process()) self.output_to_speaker = output_to_speaker if output_to_speaker: self.output_speaker = BlockingSpeakerOutput.from_default_device( diff --git a/vocode/streaming/output_device/websocket_output_device.py b/vocode/streaming/output_device/websocket_output_device.py index ca0133c14..9d6cbb86a 100644 --- a/vocode/streaming/output_device/websocket_output_device.py +++ b/vocode/streaming/output_device/websocket_output_device.py @@ -8,7 +8,7 @@ from vocode.streaming.models.transcript import TranscriptEvent from vocode.streaming.models.websocket import AudioMessage, TranscriptMessage from vocode.streaming.output_device.base_output_device import BaseOutputDevice -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task class WebsocketOutputDevice(BaseOutputDevice): @@ -20,7 +20,7 @@ def __init__(self, ws: WebSocket, sampling_rate: int, audio_encoding: AudioEncod def start(self): self.active = True - self.process_task = asyncio_create_task_with_done_error_log(self.process()) + self.process_task = asyncio_create_task(self.process()) def mark_closed(self): self.active = False diff --git a/vocode/streaming/streaming_conversation.py b/vocode/streaming/streaming_conversation.py index aadf6bfa6..8085ca43c 100644 --- a/vocode/streaming/streaming_conversation.py +++ b/vocode/streaming/streaming_conversation.py @@ -58,7 +58,7 @@ from vocode.streaming.transcriber.base_transcriber import BaseTranscriber from vocode.streaming.transcriber.deepgram_transcriber import DeepgramTranscriber from vocode.streaming.utils import create_conversation_id, get_chunk_size_per_second -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task from vocode.streaming.utils.events_manager import EventsManager from vocode.streaming.utils.speed_manager import SpeedManager from vocode.streaming.utils.state_manager import ConversationStateManager @@ -707,7 +707,7 @@ async def start(self, mark_ready: Optional[Callable[[], Awaitable[None]]] = None self.agent.start() initial_message = self.agent.get_agent_config().initial_message if initial_message: - asyncio_create_task_with_done_error_log( + asyncio_create_task( self.send_initial_message(initial_message), ) else: @@ -716,11 +716,11 @@ async def start(self, mark_ready: Optional[Callable[[], Awaitable[None]]] = None if mark_ready: await mark_ready() self.active = True - self.check_for_idle_task = asyncio_create_task_with_done_error_log( + self.check_for_idle_task = asyncio_create_task( self.check_for_idle(), ) if len(self.events_manager.subscriptions) > 0: - self.events_task = asyncio_create_task_with_done_error_log( + self.events_task = asyncio_create_task( self.events_manager.start(), ) @@ -911,7 +911,7 @@ async def get_chunks( first_chunk_span = self._maybe_create_first_chunk_span(synthesis_result, message) chunk_queue: asyncio.Queue[Optional[SynthesisResult.ChunkResult]] = asyncio.Queue() - get_chunks_task = asyncio_create_task_with_done_error_log( + get_chunks_task = asyncio_create_task( get_chunks(chunk_queue, synthesis_result.chunk_generator), ) first = True diff --git a/vocode/streaming/synthesizer/base_synthesizer.py b/vocode/streaming/synthesizer/base_synthesizer.py index e3e86661f..eb37cd740 100644 --- a/vocode/streaming/synthesizer/base_synthesizer.py +++ b/vocode/streaming/synthesizer/base_synthesizer.py @@ -21,7 +21,7 @@ from vocode.streaming.telephony.constants import MULAW_SILENCE_BYTE, PCM_SILENCE_BYTE from vocode.streaming.utils import convert_wav, get_chunk_size_per_second from vocode.streaming.utils.async_requester import AsyncRequestor -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task FILLER_PHRASES = [ BaseMessage(text="Um..."), @@ -414,7 +414,7 @@ async def send_chunks(): miniaudio_worker.consume_nonblocking(None) # sentinel try: - asyncio_create_task_with_done_error_log(send_chunks(), reraise_cancelled=True) + asyncio_create_task(send_chunks(), reraise_cancelled=True) # Await the output queue of the MiniaudioWorker and yield the wav chunks in another loop while True: diff --git a/vocode/streaming/synthesizer/cartesia_synthesizer.py b/vocode/streaming/synthesizer/cartesia_synthesizer.py index 9e1392e6a..5cf186ac0 100644 --- a/vocode/streaming/synthesizer/cartesia_synthesizer.py +++ b/vocode/streaming/synthesizer/cartesia_synthesizer.py @@ -1,13 +1,12 @@ +import hashlib import io import wave -import hashlib from vocode import getenv from vocode.streaming.models.audio import AudioEncoding, SamplingRate from vocode.streaming.models.message import BaseMessage from vocode.streaming.models.synthesizer import CartesiaSynthesizerConfig from vocode.streaming.synthesizer.base_synthesizer import BaseSynthesizer, SynthesisResult -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log class CartesiaSynthesizer(BaseSynthesizer[CartesiaSynthesizerConfig]): @@ -21,16 +20,13 @@ def __init__( try: from cartesia.tts import AsyncCartesiaTTS except ImportError as e: - raise ImportError( - f"Missing required dependancies for CartesiaSynthesizer" - ) from e - + raise ImportError(f"Missing required dependancies for CartesiaSynthesizer") from e + self.cartesia_tts = AsyncCartesiaTTS - + self.api_key = synthesizer_config.api_key or getenv("CARTESIA_API_KEY") if not self.api_key: raise ValueError("Missing Cartesia API key") - if synthesizer_config.audio_encoding == AudioEncoding.LINEAR16: self.channel_width = 2 @@ -55,16 +51,13 @@ def __init__( self.output_format = "pcm_16000" self.sampling_rate = 16000 else: - raise ValueError( - f"Unsupported audio encoding {synthesizer_config.audio_encoding}" - ) + raise ValueError(f"Unsupported audio encoding {synthesizer_config.audio_encoding}") self.num_channels = 1 self.model_id = synthesizer_config.model_id self.voice_id = synthesizer_config.voice_id self.client = self.cartesia_tts(api_key=self.api_key) self.voice_embedding = self.client.get_voice_embedding(voice_id=self.voice_id) - async def create_speech_uncached( self, @@ -78,17 +71,17 @@ async def create_speech_uncached( voice=self.voice_embedding, stream=True, model_id=self.model_id, - data_rtype='bytes', - output_format=self.output_format + data_rtype="bytes", + output_format=self.output_format, ) audio_file = io.BytesIO() - with wave.open(audio_file, 'wb') as wav_file: + with wave.open(audio_file, "wb") as wav_file: wav_file.setnchannels(self.num_channels) wav_file.setsampwidth(self.channel_width) wav_file.setframerate(self.sampling_rate) async for chunk in generator: - wav_file.writeframes(chunk['audio']) + wav_file.writeframes(chunk["audio"]) audio_file.seek(0) result = self.create_synthesis_result_from_wav( @@ -99,7 +92,7 @@ async def create_speech_uncached( ) return result - + @classmethod def get_voice_identifier(cls, synthesizer_config: CartesiaSynthesizerConfig): hashed_api_key = hashlib.sha256(f"{synthesizer_config.api_key}".encode("utf-8")).hexdigest() @@ -109,6 +102,6 @@ def get_voice_identifier(cls, synthesizer_config: CartesiaSynthesizerConfig): hashed_api_key, str(synthesizer_config.voice_id), str(synthesizer_config.model_id), - synthesizer_config.audio_encoding + synthesizer_config.audio_encoding, ) - ) \ No newline at end of file + ) diff --git a/vocode/streaming/synthesizer/eleven_labs_synthesizer.py b/vocode/streaming/synthesizer/eleven_labs_synthesizer.py index 714b5bf72..1ccd43547 100644 --- a/vocode/streaming/synthesizer/eleven_labs_synthesizer.py +++ b/vocode/streaming/synthesizer/eleven_labs_synthesizer.py @@ -10,7 +10,7 @@ from vocode.streaming.models.message import BaseMessage from vocode.streaming.models.synthesizer import ElevenLabsSynthesizerConfig from vocode.streaming.synthesizer.base_synthesizer import BaseSynthesizer, SynthesisResult -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task ELEVEN_LABS_BASE_URL = "https://api.elevenlabs.io/v1/" STREAMED_CHUNK_SIZE = 16000 * 2 // 4 # 1/8 of a second of 16kHz audio with 16-bit samples @@ -97,7 +97,7 @@ async def create_speech_uncached( body["model_id"] = self.model_id chunk_queue: asyncio.Queue[Optional[bytes]] = asyncio.Queue() - asyncio_create_task_with_done_error_log( + asyncio_create_task( self.get_chunks(url, headers, body, chunk_size, chunk_queue), ) diff --git a/vocode/streaming/synthesizer/play_ht_synthesizer_v2.py b/vocode/streaming/synthesizer/play_ht_synthesizer_v2.py index 6db09ef7d..1e58b70fb 100644 --- a/vocode/streaming/synthesizer/play_ht_synthesizer_v2.py +++ b/vocode/streaming/synthesizer/play_ht_synthesizer_v2.py @@ -18,7 +18,7 @@ ) from vocode.streaming.synthesizer.synthesizer_utils import split_text from vocode.streaming.utils import generate_from_async_iter_with_lookahead, generate_with_is_last -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task PLAY_HT_ON_PREM_ADDR = os.environ.get("VOCODE_PLAYHT_ON_PREM_ADDR", None) PLAY_HT_V2_MAX_CHARS = 200 @@ -86,7 +86,7 @@ async def create_speech_uncached( self.total_chars += len(message.text) chunk_queue: asyncio.Queue[Optional[bytes]] = asyncio.Queue() - asyncio_create_task_with_done_error_log( + asyncio_create_task( self.get_chunks( message, chunk_size, diff --git a/vocode/streaming/telephony/conversation/twilio_phone_conversation.py b/vocode/streaming/telephony/conversation/twilio_phone_conversation.py index 6145d53b0..9b5acb5fb 100644 --- a/vocode/streaming/telephony/conversation/twilio_phone_conversation.py +++ b/vocode/streaming/telephony/conversation/twilio_phone_conversation.py @@ -33,7 +33,7 @@ ) from vocode.streaming.transcriber.abstract_factory import AbstractTranscriberFactory from vocode.streaming.utils import create_utterance_id -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task from vocode.streaming.utils.events_manager import EventsManager from vocode.streaming.utils.state_manager import TwilioPhoneConversationStateManager @@ -218,7 +218,7 @@ async def send_speech_to_output( clear_message_lock = asyncio.Lock() - asyncio_create_task_with_done_error_log( + asyncio_create_task( self._send_chunks( utterance_id, synthesis_result.chunk_generator, diff --git a/vocode/streaming/utils/create_task.py b/vocode/streaming/utils/create_task.py index 9554734d0..38778d0bf 100644 --- a/vocode/streaming/utils/create_task.py +++ b/vocode/streaming/utils/create_task.py @@ -6,25 +6,12 @@ tasks_registry = [] -def log_if_exception(reraise_cancelled: bool, future: asyncio.Future) -> None: - try: - if exc := future.exception(): - logger.exception( - f"Vocode wrapped logger; exception raised by task {future}: {exc}", - exc_info=exc, - ) - except asyncio.CancelledError: - if reraise_cancelled: - raise - - -def asyncio_create_task_with_done_error_log( +def asyncio_create_task( *args, reraise_cancelled: bool = False, **kwargs, ) -> asyncio.Task: task = asyncio.create_task(*args, **kwargs) tasks_registry.append(task) - task.add_done_callback(functools.partial(log_if_exception, reraise_cancelled)) task.add_done_callback(lambda t: tasks_registry.remove(t)) return task diff --git a/vocode/streaming/utils/worker.py b/vocode/streaming/utils/worker.py index 80021b580..f2c08d88e 100644 --- a/vocode/streaming/utils/worker.py +++ b/vocode/streaming/utils/worker.py @@ -7,7 +7,7 @@ import janus from loguru import logger -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task WorkerInputType = TypeVar("WorkerInputType") @@ -23,7 +23,7 @@ def __init__( self.output_queue = output_queue def start(self) -> asyncio.Task: - self.worker_task = asyncio_create_task_with_done_error_log( + self.worker_task = asyncio_create_task( self._run_loop(), ) if not self.worker_task: @@ -60,7 +60,7 @@ def __init__( def start(self) -> asyncio.Task: self.worker_thread = threading.Thread(target=self._run_loop) self.worker_thread.start() - self.worker_task = asyncio_create_task_with_done_error_log( + self.worker_task = asyncio_create_task( self.run_thread_forwarding(), ) if not self.worker_task: @@ -224,7 +224,7 @@ async def _run_loop(self): if item.is_interrupted(): continue self.interruptible_event = item - self.current_task = asyncio_create_task_with_done_error_log( + self.current_task = asyncio_create_task( self.process(item), reraise_cancelled=True, ) From 980eb8398a08c0935e97365c791f31e53f9ccaa6 Mon Sep 17 00:00:00 2001 From: adnaans Date: Wed, 3 Jul 2024 14:11:22 -0700 Subject: [PATCH 2/2] remove log error on chatgpt query --- vocode/streaming/agent/chat_gpt_agent.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/vocode/streaming/agent/chat_gpt_agent.py b/vocode/streaming/agent/chat_gpt_agent.py index 047aa6c90..769a39284 100644 --- a/vocode/streaming/agent/chat_gpt_agent.py +++ b/vocode/streaming/agent/chat_gpt_agent.py @@ -149,26 +149,13 @@ async def _create_openai_stream_with_fallback( ) self.apply_model_fallback(chat_parameters) stream = await self.openai_client.chat.completions.create(**chat_parameters) - except Exception as e: - logger.error( - f"Error while hitting OpenAI with chat_parameters: {chat_parameters}", - exc_info=True, - ) - raise e return stream async def _create_openai_stream(self, chat_parameters: Dict[str, Any]) -> AsyncGenerator: if self.agent_config.llm_fallback is not None and self.openai_client.max_retries == 0: stream = await self._create_openai_stream_with_fallback(chat_parameters) else: - try: - stream = await self.openai_client.chat.completions.create(**chat_parameters) - except Exception as e: - logger.error( - f"Error while hitting OpenAI with chat_parameters: {chat_parameters}", - exc_info=True, - ) - raise e + stream = await self.openai_client.chat.completions.create(**chat_parameters) return stream def should_backchannel(self, human_input: str) -> bool: