diff --git a/vocode/streaming/synthesizer/cartesia_synthesizer.py b/vocode/streaming/synthesizer/cartesia_synthesizer.py index 959ca5ba5..e9387253e 100644 --- a/vocode/streaming/synthesizer/cartesia_synthesizer.py +++ b/vocode/streaming/synthesizer/cartesia_synthesizer.py @@ -1,5 +1,6 @@ import asyncio import hashlib +from typing import List, Tuple from loguru import logger @@ -90,6 +91,8 @@ def __init__( self.client = self.cartesia_tts(api_key=self.api_key) self.ws = None self.ctx = None + self.ctx_message = BaseMessage(text="") + self.ctx_timestamps: List[Tuple[str, float, float]] = [] self.no_more_inputs_task = None self.no_more_inputs_lock = asyncio.Lock() @@ -99,10 +102,14 @@ async def initialize_ws(self): async def initialize_ctx(self, is_first_text_chunk: bool): if self.ctx is None or self.ctx.is_closed(): + self.ctx_message = BaseMessage(text="") + self.ctx_timestamps = [] if self.ws: self.ctx = self.ws.context() else: if is_first_text_chunk: + self.ctx_message = BaseMessage(text="") + self.ctx_timestamps = [] if self.no_more_inputs_task: self.no_more_inputs_task.cancel() await self.ctx.no_more_inputs() @@ -144,6 +151,7 @@ async def create_speech_uncached( voice_id=self.voice_id, continue_=not is_sole_text_chunk, output_format=self.output_format, + add_timestamps=True, _experimental_voice_controls=self._experimental_voice_controls, ) if not is_sole_text_chunk: @@ -159,12 +167,20 @@ async def chunk_generator(context): try: async for event in context.receive(): audio = event.get("audio") - buffer.extend(audio) - while len(buffer) >= chunk_size: - yield SynthesisResult.ChunkResult( - chunk=buffer[:chunk_size], is_last_chunk=False - ) - buffer = buffer[chunk_size:] + word_timestamps = event.get("word_timestamps") + if word_timestamps: + words = word_timestamps["words"] + start_times = word_timestamps["start"] + end_times = word_timestamps["end"] + for word, start, end in zip(words, start_times, end_times): + self.ctx_timestamps.append((word, start, end)) + if audio: + buffer.extend(audio) + while len(buffer) >= chunk_size: + yield SynthesisResult.ChunkResult( + chunk=buffer[:chunk_size], is_last_chunk=False + ) + buffer = buffer[chunk_size:] except Exception as e: logger.info( f"Caught error while receiving audio chunks from CartesiaSynthesizer: {e}" @@ -180,11 +196,28 @@ async def chunk_generator(context): buffer.extend(b"\x00\x00" * padding_size) # 0 is silence in s16le yield SynthesisResult.ChunkResult(chunk=buffer, is_last_chunk=True) + self.ctx_message.text += transcript + + def get_message_cutoff_ctx(message, seconds, words_per_minute=150): + if seconds: + closest_index = 0 + if len(self.ctx_timestamps) > 0: + for index, word_timestamp in enumerate(self.ctx_timestamps): + _word, start, end = word_timestamp + closest_index = index + if end >= seconds: + break + if closest_index: + # Check if they're less than 2 seconds apart, fall back to words per minute otherwise + if self.ctx_timestamps[closest_index][2] - seconds < 2: + return " ".join( + [word for word, *_ in self.ctx_timestamps[: closest_index + 1]] + ) + return self.get_message_cutoff_from_voice_speed(message, seconds, words_per_minute) + return SynthesisResult( chunk_generator=chunk_generator(self.ctx), - get_message_up_to=lambda seconds: self.get_message_cutoff_from_voice_speed( - message, seconds - ), + get_message_up_to=lambda seconds: get_message_cutoff_ctx(self.ctx_message, seconds), ) @classmethod diff --git a/vocode/streaming/utils/async_requester.py b/vocode/streaming/utils/async_requester.py index 5b23875cc..d3577f590 100644 --- a/vocode/streaming/utils/async_requester.py +++ b/vocode/streaming/utils/async_requester.py @@ -1,17 +1,22 @@ +from typing import Optional + import aiohttp import httpx +from aiohttp import BaseConnector +from pydantic import BaseModel from vocode.streaming.utils.singleton import Singleton class AsyncRequestor(Singleton): - def __init__(self): - self.session = aiohttp.ClientSession() + def __init__(self, connector: Optional[BaseConnector] = None): + self.session = aiohttp.ClientSession(connector=connector) self.async_client = httpx.AsyncClient() + self.connector = connector def get_session(self): if self.session.closed: - self.session = aiohttp.ClientSession() + self.session = aiohttp.ClientSession(connector=self.connector) return self.session def get_client(self):