Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamingConversation to transcriber and synthesizer and custom provider errors #626

Merged
merged 8 commits into from
Jul 29, 2024
2 changes: 2 additions & 0 deletions vocode/streaming/streaming_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,9 +687,11 @@ def create_state_manager(self) -> ConversationStateManager:

async def start(self, mark_ready: Optional[Callable[[], Awaitable[None]]] = None):
self.transcriber.start()
self.transcriber.streaming_conversation = self
self.transcriptions_worker.start()
self.agent_responses_worker.start()
self.synthesis_results_worker.start()
self.synthesizer.streaming_conversation = self
self.output_device.start()
if self.filler_audio_worker is not None:
self.filler_audio_worker.start()
Expand Down
15 changes: 15 additions & 0 deletions vocode/streaming/synthesizer/azure_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
_AZURE_INSIDE_VOICE_REGEX = r"<voice[^>]*>(.*?)<\/voice>"


class AzureSynthesizerException(Exception):
pass


class WordBoundaryEventPool:
def __init__(self):
self.events = []
Expand Down Expand Up @@ -231,6 +235,15 @@ def get_message_up_to(
return ssml_fragment.split(">")[-1]
return message

async def _check_stream_for_errors(self, audio_data_stream: speechsdk.AudioDataStream):
if (
audio_data_stream.cancellation_details
and audio_data_stream.cancellation_details.reason == speechsdk.CancellationReason.Error
):
raise AzureSynthesizerException(
f"Azure Synthesizer Error: {audio_data_stream.cancellation_details.error_details}"
)

async def create_speech_uncached(
self,
message: BaseMessage,
Expand Down Expand Up @@ -259,6 +272,8 @@ async def chunk_generator(
self.thread_pool_executor,
lambda: audio_data_stream.read_data(audio_buffer),
)

await self._check_stream_for_errors(audio_data_stream)
if filled_size != chunk_size:
yield SynthesisResult.ChunkResult(chunk_transform(audio_buffer[offset:]), True)
return
Expand Down
18 changes: 17 additions & 1 deletion vocode/streaming/synthesizer/base_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,18 @@
import math
import os
import wave
from typing import Any, AsyncGenerator, Callable, Generic, List, Optional, Tuple, TypeVar, Union
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Callable,
Generic,
List,
Optional,
Tuple,
TypeVar,
Union,
)

import aiohttp
from loguru import logger
Expand All @@ -24,6 +35,9 @@
from vocode.streaming.utils.create_task import asyncio_create_task
from vocode.streaming.utils.worker import QueueConsumer

if TYPE_CHECKING:
from vocode.streaming.streaming_conversation import StreamingConversation

FILLER_PHRASES = [
BaseMessage(text="Um..."),
BaseMessage(text="Uh..."),
Expand Down Expand Up @@ -224,6 +238,8 @@ def create_synthesis_result(self, chunk_size) -> SynthesisResult:


class BaseSynthesizer(Generic[SynthesizerConfigType]):
streaming_conversation: "StreamingConversation"

def __init__(
self,
synthesizer_config: SynthesizerConfigType,
Expand Down
9 changes: 7 additions & 2 deletions vocode/streaming/synthesizer/eleven_labs_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
STREAMED_CHUNK_SIZE = 16000 * 2 // 4 # 1/8 of a second of 16kHz audio with 16-bit samples


class ElevenlabsException(Exception):
pass


class ElevenLabsSynthesizer(BaseSynthesizer[ElevenLabsSynthesizerConfig]):
def __init__(
self,
Expand Down Expand Up @@ -144,8 +148,9 @@ async def get_chunks(

if not stream.is_success:
error = await stream.aread()
logger.error(f"ElevenLabs API failed: {stream.status_code} {error.decode('utf-8')}")
raise Exception(f"ElevenLabs API returned {stream.status_code} status code")
raise ElevenlabsException(
f"ElevenLabs API returned {stream.status_code} status code and the following details: {error.decode('utf-8')}"
)
async for chunk in stream.aiter_bytes(chunk_size):
if self.upsample:
chunk = self._resample_chunk(
Expand Down
20 changes: 16 additions & 4 deletions vocode/streaming/synthesizer/play_ht_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
TTS_ENDPOINT = "https://play.ht/api/v2/tts/stream"


class PlayHTV1APIError(Exception):
pass


class PlayHtSynthesizer(BaseSynthesizer[PlayHtSynthesizerConfig]):
def __init__(
self,
Expand Down Expand Up @@ -78,15 +82,23 @@ async def create_speech_uncached(
timeout=ClientTimeout(total=15),
)
if not response.ok:
raise Exception(f"Play.ht API error status code {response.status}")
response_json = await response.json()
if response_json and "error_message" in response_json:
error_message = response_json["error_message"]
else:
error_message = (
f"Request to Play.HT failed with status code {str(response.status)}"
)

raise PlayHTV1APIError(
f"Play.ht API error status code {response.status}",
error_message,
)
if response.status == 429 and attempt < max_backoff_retries - 1:
await asyncio.sleep(backoff_retry_delay)
backoff_retry_delay *= 2 # Exponentially increase delay
continue

if not response.ok:
raise Exception(f"Play.ht API error status code {response.status}")

if self.experimental_streaming:
return SynthesisResult(
self.experimental_mp3_streaming_output_generator(
Expand Down
11 changes: 8 additions & 3 deletions vocode/streaming/synthesizer/rime_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
WAV_HEADER_LENGTH = 44


class RimeError(Exception):
pass


class RimeSynthesizer(BaseSynthesizer[RimeSynthesizerConfig]):
def __init__(
self,
Expand Down Expand Up @@ -73,7 +77,7 @@ async def create_speech_uncached(
timeout=aiohttp.ClientTimeout(total=15),
) as response:
if not response.ok:
raise Exception(f"Rime API error: {response.status}, {await response.text()}")
raise RimeError(f"Rime API error: {response.status}, {await response.text()}")
data = json.loads(await response.text())

audio_content = data.get("audioContent")
Expand Down Expand Up @@ -117,8 +121,9 @@ async def get_chunks(
)
if not stream.is_success:
error = await stream.aread()
logger.error(f"Rime API failed: {stream.status_code} {error.decode('utf-8')}")
raise Exception(f"Rime API returned {stream.status_code} status code")
raise RimeError(
f"Rime API returned {stream.status_code} status code with the following details: {error.decode('utf-8')}"
)
async for chunk in stream.aiter_bytes(chunk_size):
chunk_queue.put_nowait(chunk)
except asyncio.CancelledError:
Expand Down
6 changes: 5 additions & 1 deletion vocode/streaming/transcriber/base_transcriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@
import asyncio
import audioop
from abc import ABC, abstractmethod
from typing import Generic, Optional, TypeVar, Union
from typing import TYPE_CHECKING, Generic, Optional, TypeVar, Union

from vocode.streaming.models.audio import AudioEncoding
from vocode.streaming.models.transcriber import TranscriberConfig, Transcription
from vocode.streaming.utils.speed_manager import SpeedManager
from vocode.streaming.utils.worker import AbstractWorker, AsyncWorker, ThreadAsyncWorker

if TYPE_CHECKING:
vocode-petern marked this conversation as resolved.
Show resolved Hide resolved
from vocode.streaming.streaming_conversation import StreamingConversation

TranscriberConfigType = TypeVar("TranscriberConfigType", bound=TranscriberConfig)


class AbstractTranscriber(Generic[TranscriberConfigType], AbstractWorker[bytes]):
consumer: AbstractWorker[Transcription]
streaming_conversation: "StreamingConversation"

def __init__(self, transcriber_config: TranscriberConfigType):
AbstractWorker.__init__(self)
Expand Down
Loading