From 11e8d24eafefb9092e4c7a560995ed06ba180a29 Mon Sep 17 00:00:00 2001 From: Sauhard Jain Date: Mon, 29 Jul 2024 13:54:42 -0700 Subject: [PATCH] Improve Cartesia Synthesizer error handling (#663) * feat: upgrade cartesia to 1.0.7 and support for continuations * prevent errors * fix: review comments * Update vocode/streaming/models/synthesizer.py * fix: improve error handling * fix: intialize ctx * chore: lint fix --------- Co-authored-by: Ajay Raj --- .../synthesizer/cartesia_synthesizer.py | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/vocode/streaming/synthesizer/cartesia_synthesizer.py b/vocode/streaming/synthesizer/cartesia_synthesizer.py index 445ab03a7..e9ea6ac21 100644 --- a/vocode/streaming/synthesizer/cartesia_synthesizer.py +++ b/vocode/streaming/synthesizer/cartesia_synthesizer.py @@ -1,5 +1,7 @@ import hashlib +from loguru import logger + from vocode import getenv from vocode.streaming.models.audio import AudioEncoding, SamplingRate from vocode.streaming.models.message import BaseMessage @@ -87,8 +89,15 @@ def __init__( async def initialize_ws(self): if self.ws is None: self.ws = await self.client.tts.websocket() + + async def initialize_ctx(self, is_first_text_chunk: bool): if self.ctx is None or self.ctx.is_closed(): - self.ctx = self.ws.context() + if self.ws: + self.ctx = self.ws.context() + else: + if is_first_text_chunk: + await self.ctx.no_more_inputs() + self.ctx = self.ws.context() async def create_speech_uncached( self, @@ -98,10 +107,7 @@ async def create_speech_uncached( is_sole_text_chunk: bool = False, ) -> SynthesisResult: await self.initialize_ws() - - if is_first_text_chunk and self.ws and self.ctx: - await self.ctx.no_more_inputs() - self.ctx = self.ws.context() + await self.initialize_ctx(is_first_text_chunk) transcript = message.text @@ -122,15 +128,22 @@ async def chunk_generator(context): buffer = bytearray() if context.is_closed(): return - async for event in context.receive(): - audio = event.get("audio") - buffer.extend(audio) - while len(buffer) >= chunk_size: - yield SynthesisResult.ChunkResult( - chunk=buffer[:chunk_size], is_last_chunk=False - ) - buffer = buffer[chunk_size:] - yield SynthesisResult.ChunkResult(chunk=buffer, is_last_chunk=True) + try: + async for event in context.receive(): + audio = event.get("audio") + buffer.extend(audio) + while len(buffer) >= chunk_size: + yield SynthesisResult.ChunkResult( + chunk=buffer[:chunk_size], is_last_chunk=False + ) + buffer = buffer[chunk_size:] + except Exception as e: + logger.info( + f"Caught error while receiving audio chunks from CartesiaSynthesizer: {e}" + ) + self.ctx._close() + if buffer: + yield SynthesisResult.ChunkResult(chunk=buffer, is_last_chunk=True) return SynthesisResult( chunk_generator=chunk_generator(self.ctx),