Skip to content

Commit

Permalink
Update vocodehq-public (#673)
Browse files Browse the repository at this point in the history
* Custom provider errors and add StreamingConversation to transcriber and synthesizer (#626)

* Improve Cartesia Synthesizer error handling (#663)

* feat: upgrade cartesia to 1.0.7 and support for continuations

* prevent errors

* fix: review comments

* Update vocode/streaming/models/synthesizer.py

* fix: improve error handling

* fix: intialize ctx

* chore: lint fix

---------

Co-authored-by: Ajay Raj <[email protected]>

* Update agents.mdx (#664)

* poetry version prerelease (#665)

---------

Co-authored-by: Adnaan Sachidanandan <[email protected]>
Co-authored-by: Sauhard Jain <[email protected]>
  • Loading branch information
3 people authored Aug 5, 2024
1 parent 1652eb1 commit 614d4ea
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 27 deletions.
2 changes: 1 addition & 1 deletion docs/agents.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ agent behavior:
- `language` sets the agent language (for more context see [Multilingual Agents](/multilingual))
- `initial_message` controls the agents first utterance.
- `initial_message_delay` adds a delay to the initial message from when the call begins
- `ask_if_human_present_on_idle` allows the agent to speak when there is more than 4s of silence on the call
- `ask_if_human_present_on_idle` allows the agent to speak when there is more than 15s of silence on the call
- `llm_temperature` controls the behavior of the underlying language model. Values can range from 0 to 1, with higher
values leading to more diverse and creative results. Lower values generate more consistent outputs.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "vocode"
version = "0.1.114a0"
version = "0.1.114a1"
description = "The all-in-one voice SDK"
authors = ["Ajay Raj <[email protected]>"]
license = "MIT License"
Expand Down
2 changes: 2 additions & 0 deletions vocode/streaming/streaming_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,9 +687,11 @@ def create_state_manager(self) -> ConversationStateManager:

async def start(self, mark_ready: Optional[Callable[[], Awaitable[None]]] = None):
self.transcriber.start()
self.transcriber.streaming_conversation = self
self.transcriptions_worker.start()
self.agent_responses_worker.start()
self.synthesis_results_worker.start()
self.synthesizer.streaming_conversation = self
self.output_device.start()
if self.filler_audio_worker is not None:
self.filler_audio_worker.start()
Expand Down
15 changes: 15 additions & 0 deletions vocode/streaming/synthesizer/azure_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
_AZURE_INSIDE_VOICE_REGEX = r"<voice[^>]*>(.*?)<\/voice>"


class AzureSynthesizerException(Exception):
pass


class WordBoundaryEventPool:
def __init__(self):
self.events = []
Expand Down Expand Up @@ -231,6 +235,15 @@ def get_message_up_to(
return ssml_fragment.split(">")[-1]
return message

async def _check_stream_for_errors(self, audio_data_stream: speechsdk.AudioDataStream):
if (
audio_data_stream.cancellation_details
and audio_data_stream.cancellation_details.reason == speechsdk.CancellationReason.Error
):
raise AzureSynthesizerException(
f"Azure Synthesizer Error: {audio_data_stream.cancellation_details.error_details}"
)

async def create_speech_uncached(
self,
message: BaseMessage,
Expand Down Expand Up @@ -259,6 +272,8 @@ async def chunk_generator(
self.thread_pool_executor,
lambda: audio_data_stream.read_data(audio_buffer),
)

await self._check_stream_for_errors(audio_data_stream)
if filled_size != chunk_size:
yield SynthesisResult.ChunkResult(chunk_transform(audio_buffer[offset:]), True)
return
Expand Down
18 changes: 17 additions & 1 deletion vocode/streaming/synthesizer/base_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,18 @@
import math
import os
import wave
from typing import Any, AsyncGenerator, Callable, Generic, List, Optional, Tuple, TypeVar, Union
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Callable,
Generic,
List,
Optional,
Tuple,
TypeVar,
Union,
)

import aiohttp
from loguru import logger
Expand All @@ -24,6 +35,9 @@
from vocode.streaming.utils.create_task import asyncio_create_task
from vocode.streaming.utils.worker import QueueConsumer

if TYPE_CHECKING:
from vocode.streaming.streaming_conversation import StreamingConversation

FILLER_PHRASES = [
BaseMessage(text="Um..."),
BaseMessage(text="Uh..."),
Expand Down Expand Up @@ -224,6 +238,8 @@ def create_synthesis_result(self, chunk_size) -> SynthesisResult:


class BaseSynthesizer(Generic[SynthesizerConfigType]):
streaming_conversation: "StreamingConversation"

def __init__(
self,
synthesizer_config: SynthesizerConfigType,
Expand Down
41 changes: 27 additions & 14 deletions vocode/streaming/synthesizer/cartesia_synthesizer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import hashlib

from loguru import logger

from vocode import getenv
from vocode.streaming.models.audio import AudioEncoding, SamplingRate
from vocode.streaming.models.message import BaseMessage
Expand Down Expand Up @@ -87,8 +89,15 @@ def __init__(
async def initialize_ws(self):
if self.ws is None:
self.ws = await self.client.tts.websocket()

async def initialize_ctx(self, is_first_text_chunk: bool):
if self.ctx is None or self.ctx.is_closed():
self.ctx = self.ws.context()
if self.ws:
self.ctx = self.ws.context()
else:
if is_first_text_chunk:
await self.ctx.no_more_inputs()
self.ctx = self.ws.context()

async def create_speech_uncached(
self,
Expand All @@ -98,10 +107,7 @@ async def create_speech_uncached(
is_sole_text_chunk: bool = False,
) -> SynthesisResult:
await self.initialize_ws()

if is_first_text_chunk and self.ws and self.ctx:
await self.ctx.no_more_inputs()
self.ctx = self.ws.context()
await self.initialize_ctx(is_first_text_chunk)

transcript = message.text

Expand All @@ -122,15 +128,22 @@ async def chunk_generator(context):
buffer = bytearray()
if context.is_closed():
return
async for event in context.receive():
audio = event.get("audio")
buffer.extend(audio)
while len(buffer) >= chunk_size:
yield SynthesisResult.ChunkResult(
chunk=buffer[:chunk_size], is_last_chunk=False
)
buffer = buffer[chunk_size:]
yield SynthesisResult.ChunkResult(chunk=buffer, is_last_chunk=True)
try:
async for event in context.receive():
audio = event.get("audio")
buffer.extend(audio)
while len(buffer) >= chunk_size:
yield SynthesisResult.ChunkResult(
chunk=buffer[:chunk_size], is_last_chunk=False
)
buffer = buffer[chunk_size:]
except Exception as e:
logger.info(
f"Caught error while receiving audio chunks from CartesiaSynthesizer: {e}"
)
self.ctx._close()
if buffer:
yield SynthesisResult.ChunkResult(chunk=buffer, is_last_chunk=True)

return SynthesisResult(
chunk_generator=chunk_generator(self.ctx),
Expand Down
9 changes: 7 additions & 2 deletions vocode/streaming/synthesizer/eleven_labs_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
STREAMED_CHUNK_SIZE = 16000 * 2 // 4 # 1/8 of a second of 16kHz audio with 16-bit samples


class ElevenlabsException(Exception):
pass


class ElevenLabsSynthesizer(BaseSynthesizer[ElevenLabsSynthesizerConfig]):
def __init__(
self,
Expand Down Expand Up @@ -144,8 +148,9 @@ async def get_chunks(

if not stream.is_success:
error = await stream.aread()
logger.error(f"ElevenLabs API failed: {stream.status_code} {error.decode('utf-8')}")
raise Exception(f"ElevenLabs API returned {stream.status_code} status code")
raise ElevenlabsException(
f"ElevenLabs API returned {stream.status_code} status code and the following details: {error.decode('utf-8')}"
)
async for chunk in stream.aiter_bytes(chunk_size):
if self.upsample:
chunk = self._resample_chunk(
Expand Down
20 changes: 16 additions & 4 deletions vocode/streaming/synthesizer/play_ht_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
TTS_ENDPOINT = "https://play.ht/api/v2/tts/stream"


class PlayHTV1APIError(Exception):
pass


class PlayHtSynthesizer(BaseSynthesizer[PlayHtSynthesizerConfig]):
def __init__(
self,
Expand Down Expand Up @@ -78,15 +82,23 @@ async def create_speech_uncached(
timeout=ClientTimeout(total=15),
)
if not response.ok:
raise Exception(f"Play.ht API error status code {response.status}")
response_json = await response.json()
if response_json and "error_message" in response_json:
error_message = response_json["error_message"]
else:
error_message = (
f"Request to Play.HT failed with status code {str(response.status)}"
)

raise PlayHTV1APIError(
f"Play.ht API error status code {response.status}",
error_message,
)
if response.status == 429 and attempt < max_backoff_retries - 1:
await asyncio.sleep(backoff_retry_delay)
backoff_retry_delay *= 2 # Exponentially increase delay
continue

if not response.ok:
raise Exception(f"Play.ht API error status code {response.status}")

if self.experimental_streaming:
return SynthesisResult(
self.experimental_mp3_streaming_output_generator(
Expand Down
11 changes: 8 additions & 3 deletions vocode/streaming/synthesizer/rime_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
WAV_HEADER_LENGTH = 44


class RimeError(Exception):
pass


class RimeSynthesizer(BaseSynthesizer[RimeSynthesizerConfig]):
def __init__(
self,
Expand Down Expand Up @@ -73,7 +77,7 @@ async def create_speech_uncached(
timeout=aiohttp.ClientTimeout(total=15),
) as response:
if not response.ok:
raise Exception(f"Rime API error: {response.status}, {await response.text()}")
raise RimeError(f"Rime API error: {response.status}, {await response.text()}")
data = json.loads(await response.text())

audio_content = data.get("audioContent")
Expand Down Expand Up @@ -117,8 +121,9 @@ async def get_chunks(
)
if not stream.is_success:
error = await stream.aread()
logger.error(f"Rime API failed: {stream.status_code} {error.decode('utf-8')}")
raise Exception(f"Rime API returned {stream.status_code} status code")
raise RimeError(
f"Rime API returned {stream.status_code} status code with the following details: {error.decode('utf-8')}"
)
async for chunk in stream.aiter_bytes(chunk_size):
chunk_queue.put_nowait(chunk)
except asyncio.CancelledError:
Expand Down
6 changes: 5 additions & 1 deletion vocode/streaming/transcriber/base_transcriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@
import asyncio
import audioop
from abc import ABC, abstractmethod
from typing import Generic, Optional, TypeVar, Union
from typing import TYPE_CHECKING, Generic, Optional, TypeVar, Union

from vocode.streaming.models.audio import AudioEncoding
from vocode.streaming.models.transcriber import TranscriberConfig, Transcription
from vocode.streaming.utils.speed_manager import SpeedManager
from vocode.streaming.utils.worker import AbstractWorker, AsyncWorker, ThreadAsyncWorker

if TYPE_CHECKING:
from vocode.streaming.streaming_conversation import StreamingConversation

TranscriberConfigType = TypeVar("TranscriberConfigType", bound=TranscriberConfig)


class AbstractTranscriber(Generic[TranscriberConfigType], AbstractWorker[bytes]):
consumer: AbstractWorker[Transcription]
streaming_conversation: "StreamingConversation"

def __init__(self, transcriber_config: TranscriberConfigType):
AbstractWorker.__init__(self)
Expand Down

0 comments on commit 614d4ea

Please sign in to comment.