Skip to content

Commit

Permalink
Improve Cartesia Synthesizer error handling (#663)
Browse files Browse the repository at this point in the history
* feat: upgrade cartesia to 1.0.7 and support for continuations

* prevent errors

* fix: review comments

* Update vocode/streaming/models/synthesizer.py

* fix: improve error handling

* fix: intialize ctx

* chore: lint fix

---------

Co-authored-by: Ajay Raj <[email protected]>
  • Loading branch information
sauhardjain and ajar98 authored Jul 29, 2024
1 parent d8f8aca commit 11e8d24
Showing 1 changed file with 27 additions and 14 deletions.
41 changes: 27 additions & 14 deletions vocode/streaming/synthesizer/cartesia_synthesizer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import hashlib

from loguru import logger

from vocode import getenv
from vocode.streaming.models.audio import AudioEncoding, SamplingRate
from vocode.streaming.models.message import BaseMessage
Expand Down Expand Up @@ -87,8 +89,15 @@ def __init__(
async def initialize_ws(self):
if self.ws is None:
self.ws = await self.client.tts.websocket()

async def initialize_ctx(self, is_first_text_chunk: bool):
if self.ctx is None or self.ctx.is_closed():
self.ctx = self.ws.context()
if self.ws:
self.ctx = self.ws.context()
else:
if is_first_text_chunk:
await self.ctx.no_more_inputs()
self.ctx = self.ws.context()

async def create_speech_uncached(
self,
Expand All @@ -98,10 +107,7 @@ async def create_speech_uncached(
is_sole_text_chunk: bool = False,
) -> SynthesisResult:
await self.initialize_ws()

if is_first_text_chunk and self.ws and self.ctx:
await self.ctx.no_more_inputs()
self.ctx = self.ws.context()
await self.initialize_ctx(is_first_text_chunk)

transcript = message.text

Expand All @@ -122,15 +128,22 @@ async def chunk_generator(context):
buffer = bytearray()
if context.is_closed():
return
async for event in context.receive():
audio = event.get("audio")
buffer.extend(audio)
while len(buffer) >= chunk_size:
yield SynthesisResult.ChunkResult(
chunk=buffer[:chunk_size], is_last_chunk=False
)
buffer = buffer[chunk_size:]
yield SynthesisResult.ChunkResult(chunk=buffer, is_last_chunk=True)
try:
async for event in context.receive():
audio = event.get("audio")
buffer.extend(audio)
while len(buffer) >= chunk_size:
yield SynthesisResult.ChunkResult(
chunk=buffer[:chunk_size], is_last_chunk=False
)
buffer = buffer[chunk_size:]
except Exception as e:
logger.info(
f"Caught error while receiving audio chunks from CartesiaSynthesizer: {e}"
)
self.ctx._close()
if buffer:
yield SynthesisResult.ChunkResult(chunk=buffer, is_last_chunk=True)

return SynthesisResult(
chunk_generator=chunk_generator(self.ctx),
Expand Down

0 comments on commit 11e8d24

Please sign in to comment.