Skip to content

Commit

Permalink
Merge pull request #707 from vocodedev/main
Browse files Browse the repository at this point in the history
Update vocodehq-public
  • Loading branch information
ajar98 authored Sep 20, 2024
2 parents 4f429e9 + a40c59b commit f211ffe
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 12 deletions.
51 changes: 42 additions & 9 deletions vocode/streaming/synthesizer/cartesia_synthesizer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import hashlib
from typing import List, Tuple

from loguru import logger

Expand Down Expand Up @@ -90,6 +91,8 @@ def __init__(
self.client = self.cartesia_tts(api_key=self.api_key)
self.ws = None
self.ctx = None
self.ctx_message = BaseMessage(text="")
self.ctx_timestamps: List[Tuple[str, float, float]] = []
self.no_more_inputs_task = None
self.no_more_inputs_lock = asyncio.Lock()

Expand All @@ -99,10 +102,14 @@ async def initialize_ws(self):

async def initialize_ctx(self, is_first_text_chunk: bool):
if self.ctx is None or self.ctx.is_closed():
self.ctx_message = BaseMessage(text="")
self.ctx_timestamps = []
if self.ws:
self.ctx = self.ws.context()
else:
if is_first_text_chunk:
self.ctx_message = BaseMessage(text="")
self.ctx_timestamps = []
if self.no_more_inputs_task:
self.no_more_inputs_task.cancel()
await self.ctx.no_more_inputs()
Expand Down Expand Up @@ -144,6 +151,7 @@ async def create_speech_uncached(
voice_id=self.voice_id,
continue_=not is_sole_text_chunk,
output_format=self.output_format,
add_timestamps=True,
_experimental_voice_controls=self._experimental_voice_controls,
)
if not is_sole_text_chunk:
Expand All @@ -159,12 +167,20 @@ async def chunk_generator(context):
try:
async for event in context.receive():
audio = event.get("audio")
buffer.extend(audio)
while len(buffer) >= chunk_size:
yield SynthesisResult.ChunkResult(
chunk=buffer[:chunk_size], is_last_chunk=False
)
buffer = buffer[chunk_size:]
word_timestamps = event.get("word_timestamps")
if word_timestamps:
words = word_timestamps["words"]
start_times = word_timestamps["start"]
end_times = word_timestamps["end"]
for word, start, end in zip(words, start_times, end_times):
self.ctx_timestamps.append((word, start, end))
if audio:
buffer.extend(audio)
while len(buffer) >= chunk_size:
yield SynthesisResult.ChunkResult(
chunk=buffer[:chunk_size], is_last_chunk=False
)
buffer = buffer[chunk_size:]
except Exception as e:
logger.info(
f"Caught error while receiving audio chunks from CartesiaSynthesizer: {e}"
Expand All @@ -180,11 +196,28 @@ async def chunk_generator(context):
buffer.extend(b"\x00\x00" * padding_size) # 0 is silence in s16le
yield SynthesisResult.ChunkResult(chunk=buffer, is_last_chunk=True)

self.ctx_message.text += transcript

def get_message_cutoff_ctx(message, seconds, words_per_minute=150):
if seconds:
closest_index = 0
if len(self.ctx_timestamps) > 0:
for index, word_timestamp in enumerate(self.ctx_timestamps):
_word, start, end = word_timestamp
closest_index = index
if end >= seconds:
break
if closest_index:
# Check if they're less than 2 seconds apart, fall back to words per minute otherwise
if self.ctx_timestamps[closest_index][2] - seconds < 2:
return " ".join(
[word for word, *_ in self.ctx_timestamps[: closest_index + 1]]
)
return self.get_message_cutoff_from_voice_speed(message, seconds, words_per_minute)

return SynthesisResult(
chunk_generator=chunk_generator(self.ctx),
get_message_up_to=lambda seconds: self.get_message_cutoff_from_voice_speed(
message, seconds
),
get_message_up_to=lambda seconds: get_message_cutoff_ctx(self.ctx_message, seconds),
)

@classmethod
Expand Down
11 changes: 8 additions & 3 deletions vocode/streaming/utils/async_requester.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
from typing import Optional

import aiohttp
import httpx
from aiohttp import BaseConnector
from pydantic import BaseModel

from vocode.streaming.utils.singleton import Singleton


class AsyncRequestor(Singleton):
def __init__(self):
self.session = aiohttp.ClientSession()
def __init__(self, connector: Optional[BaseConnector] = None):
self.session = aiohttp.ClientSession(connector=connector)
self.async_client = httpx.AsyncClient()
self.connector = connector

def get_session(self):
if self.session.closed:
self.session = aiohttp.ClientSession()
self.session = aiohttp.ClientSession(connector=self.connector)
return self.session

def get_client(self):
Expand Down

0 comments on commit f211ffe

Please sign in to comment.