Skip to content

Commit

Permalink
Custom provider errors and add StreamingConversation to transcriber a…
Browse files Browse the repository at this point in the history
…nd synthesizer (#626)
  • Loading branch information
adnaans authored Jul 29, 2024
1 parent eaafc1b commit d8f8aca
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 11 deletions.
2 changes: 2 additions & 0 deletions vocode/streaming/streaming_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,9 +687,11 @@ def create_state_manager(self) -> ConversationStateManager:

async def start(self, mark_ready: Optional[Callable[[], Awaitable[None]]] = None):
self.transcriber.start()
self.transcriber.streaming_conversation = self
self.transcriptions_worker.start()
self.agent_responses_worker.start()
self.synthesis_results_worker.start()
self.synthesizer.streaming_conversation = self
self.output_device.start()
if self.filler_audio_worker is not None:
self.filler_audio_worker.start()
Expand Down
15 changes: 15 additions & 0 deletions vocode/streaming/synthesizer/azure_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
_AZURE_INSIDE_VOICE_REGEX = r"<voice[^>]*>(.*?)<\/voice>"


class AzureSynthesizerException(Exception):
pass


class WordBoundaryEventPool:
def __init__(self):
self.events = []
Expand Down Expand Up @@ -231,6 +235,15 @@ def get_message_up_to(
return ssml_fragment.split(">")[-1]
return message

async def _check_stream_for_errors(self, audio_data_stream: speechsdk.AudioDataStream):
if (
audio_data_stream.cancellation_details
and audio_data_stream.cancellation_details.reason == speechsdk.CancellationReason.Error
):
raise AzureSynthesizerException(
f"Azure Synthesizer Error: {audio_data_stream.cancellation_details.error_details}"
)

async def create_speech_uncached(
self,
message: BaseMessage,
Expand Down Expand Up @@ -259,6 +272,8 @@ async def chunk_generator(
self.thread_pool_executor,
lambda: audio_data_stream.read_data(audio_buffer),
)

await self._check_stream_for_errors(audio_data_stream)
if filled_size != chunk_size:
yield SynthesisResult.ChunkResult(chunk_transform(audio_buffer[offset:]), True)
return
Expand Down
18 changes: 17 additions & 1 deletion vocode/streaming/synthesizer/base_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,18 @@
import math
import os
import wave
from typing import Any, AsyncGenerator, Callable, Generic, List, Optional, Tuple, TypeVar, Union
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Callable,
Generic,
List,
Optional,
Tuple,
TypeVar,
Union,
)

import aiohttp
from loguru import logger
Expand All @@ -24,6 +35,9 @@
from vocode.streaming.utils.create_task import asyncio_create_task
from vocode.streaming.utils.worker import QueueConsumer

if TYPE_CHECKING:
from vocode.streaming.streaming_conversation import StreamingConversation

FILLER_PHRASES = [
BaseMessage(text="Um..."),
BaseMessage(text="Uh..."),
Expand Down Expand Up @@ -224,6 +238,8 @@ def create_synthesis_result(self, chunk_size) -> SynthesisResult:


class BaseSynthesizer(Generic[SynthesizerConfigType]):
streaming_conversation: "StreamingConversation"

def __init__(
self,
synthesizer_config: SynthesizerConfigType,
Expand Down
9 changes: 7 additions & 2 deletions vocode/streaming/synthesizer/eleven_labs_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
STREAMED_CHUNK_SIZE = 16000 * 2 // 4 # 1/8 of a second of 16kHz audio with 16-bit samples


class ElevenlabsException(Exception):
pass


class ElevenLabsSynthesizer(BaseSynthesizer[ElevenLabsSynthesizerConfig]):
def __init__(
self,
Expand Down Expand Up @@ -144,8 +148,9 @@ async def get_chunks(

if not stream.is_success:
error = await stream.aread()
logger.error(f"ElevenLabs API failed: {stream.status_code} {error.decode('utf-8')}")
raise Exception(f"ElevenLabs API returned {stream.status_code} status code")
raise ElevenlabsException(
f"ElevenLabs API returned {stream.status_code} status code and the following details: {error.decode('utf-8')}"
)
async for chunk in stream.aiter_bytes(chunk_size):
if self.upsample:
chunk = self._resample_chunk(
Expand Down
20 changes: 16 additions & 4 deletions vocode/streaming/synthesizer/play_ht_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
TTS_ENDPOINT = "https://play.ht/api/v2/tts/stream"


class PlayHTV1APIError(Exception):
pass


class PlayHtSynthesizer(BaseSynthesizer[PlayHtSynthesizerConfig]):
def __init__(
self,
Expand Down Expand Up @@ -78,15 +82,23 @@ async def create_speech_uncached(
timeout=ClientTimeout(total=15),
)
if not response.ok:
raise Exception(f"Play.ht API error status code {response.status}")
response_json = await response.json()
if response_json and "error_message" in response_json:
error_message = response_json["error_message"]
else:
error_message = (
f"Request to Play.HT failed with status code {str(response.status)}"
)

raise PlayHTV1APIError(
f"Play.ht API error status code {response.status}",
error_message,
)
if response.status == 429 and attempt < max_backoff_retries - 1:
await asyncio.sleep(backoff_retry_delay)
backoff_retry_delay *= 2 # Exponentially increase delay
continue

if not response.ok:
raise Exception(f"Play.ht API error status code {response.status}")

if self.experimental_streaming:
return SynthesisResult(
self.experimental_mp3_streaming_output_generator(
Expand Down
11 changes: 8 additions & 3 deletions vocode/streaming/synthesizer/rime_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
WAV_HEADER_LENGTH = 44


class RimeError(Exception):
pass


class RimeSynthesizer(BaseSynthesizer[RimeSynthesizerConfig]):
def __init__(
self,
Expand Down Expand Up @@ -73,7 +77,7 @@ async def create_speech_uncached(
timeout=aiohttp.ClientTimeout(total=15),
) as response:
if not response.ok:
raise Exception(f"Rime API error: {response.status}, {await response.text()}")
raise RimeError(f"Rime API error: {response.status}, {await response.text()}")
data = json.loads(await response.text())

audio_content = data.get("audioContent")
Expand Down Expand Up @@ -117,8 +121,9 @@ async def get_chunks(
)
if not stream.is_success:
error = await stream.aread()
logger.error(f"Rime API failed: {stream.status_code} {error.decode('utf-8')}")
raise Exception(f"Rime API returned {stream.status_code} status code")
raise RimeError(
f"Rime API returned {stream.status_code} status code with the following details: {error.decode('utf-8')}"
)
async for chunk in stream.aiter_bytes(chunk_size):
chunk_queue.put_nowait(chunk)
except asyncio.CancelledError:
Expand Down
6 changes: 5 additions & 1 deletion vocode/streaming/transcriber/base_transcriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@
import asyncio
import audioop
from abc import ABC, abstractmethod
from typing import Generic, Optional, TypeVar, Union
from typing import TYPE_CHECKING, Generic, Optional, TypeVar, Union

from vocode.streaming.models.audio import AudioEncoding
from vocode.streaming.models.transcriber import TranscriberConfig, Transcription
from vocode.streaming.utils.speed_manager import SpeedManager
from vocode.streaming.utils.worker import AbstractWorker, AsyncWorker, ThreadAsyncWorker

if TYPE_CHECKING:
from vocode.streaming.streaming_conversation import StreamingConversation

TranscriberConfigType = TypeVar("TranscriberConfigType", bound=TranscriberConfig)


class AbstractTranscriber(Generic[TranscriberConfigType], AbstractWorker[bytes]):
consumer: AbstractWorker[Transcription]
streaming_conversation: "StreamingConversation"

def __init__(self, transcriber_config: TranscriberConfigType):
AbstractWorker.__init__(self)
Expand Down

0 comments on commit d8f8aca

Please sign in to comment.