diff --git a/vocode/streaming/agent/chat_gpt_agent.py b/vocode/streaming/agent/chat_gpt_agent.py index 11e7c6586..4d54eb54a 100644 --- a/vocode/streaming/agent/chat_gpt_agent.py +++ b/vocode/streaming/agent/chat_gpt_agent.py @@ -151,26 +151,13 @@ async def _create_openai_stream_with_fallback( ) self.apply_model_fallback(chat_parameters) stream = await self.openai_client.chat.completions.create(**chat_parameters) - except Exception as e: - logger.error( - f"Error while hitting OpenAI with chat_parameters: {chat_parameters}", - exc_info=True, - ) - raise e return stream async def _create_openai_stream(self, chat_parameters: Dict[str, Any]) -> AsyncGenerator: if self.agent_config.llm_fallback is not None and self.openai_client.max_retries == 0: stream = await self._create_openai_stream_with_fallback(chat_parameters) else: - try: - stream = await self.openai_client.chat.completions.create(**chat_parameters) - except Exception as e: - logger.error( - f"Error while hitting OpenAI with chat_parameters: {chat_parameters}", - exc_info=True, - ) - raise e + stream = await self.openai_client.chat.completions.create(**chat_parameters) return stream def should_backchannel(self, human_input: str) -> bool: diff --git a/vocode/streaming/output_device/twilio_output_device.py b/vocode/streaming/output_device/twilio_output_device.py index 74e0e9c07..00926e9b0 100644 --- a/vocode/streaming/output_device/twilio_output_device.py +++ b/vocode/streaming/output_device/twilio_output_device.py @@ -14,7 +14,7 @@ from vocode.streaming.output_device.abstract_output_device import AbstractOutputDevice from vocode.streaming.output_device.audio_chunk import AudioChunk, ChunkState from vocode.streaming.telephony.constants import DEFAULT_AUDIO_ENCODING, DEFAULT_SAMPLING_RATE -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task from vocode.streaming.utils.worker import InterruptibleEvent @@ -95,12 +95,8 @@ async def _process_mark_messages(self): self.interruptible_event.is_interruptible = False async def _run_loop(self): - send_twilio_messages_task = asyncio_create_task_with_done_error_log( - self._send_twilio_messages() - ) - process_mark_messages_task = asyncio_create_task_with_done_error_log( - self._process_mark_messages() - ) + send_twilio_messages_task = asyncio_create_task(self._send_twilio_messages()) + process_mark_messages_task = asyncio_create_task(self._process_mark_messages()) await asyncio.gather(send_twilio_messages_task, process_mark_messages_task) def _send_audio_chunk_and_mark(self, chunk: bytes, chunk_id: str): diff --git a/vocode/streaming/streaming_conversation.py b/vocode/streaming/streaming_conversation.py index 58ce8d167..52e862ca6 100644 --- a/vocode/streaming/streaming_conversation.py +++ b/vocode/streaming/streaming_conversation.py @@ -61,7 +61,7 @@ enumerate_async_iter, get_chunk_size_per_second, ) -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task from vocode.streaming.utils.events_manager import EventsManager from vocode.streaming.utils.speed_manager import SpeedManager from vocode.streaming.utils.state_manager import ConversationStateManager @@ -710,7 +710,7 @@ async def start(self, mark_ready: Optional[Callable[[], Awaitable[None]]] = None self.agent.start() initial_message = self.agent.get_agent_config().initial_message if initial_message: - asyncio_create_task_with_done_error_log( + asyncio_create_task( self.send_initial_message(initial_message), ) else: @@ -719,11 +719,11 @@ async def start(self, mark_ready: Optional[Callable[[], Awaitable[None]]] = None if mark_ready: await mark_ready() self.is_terminated.clear() - self.check_for_idle_task = asyncio_create_task_with_done_error_log( + self.check_for_idle_task = asyncio_create_task( self.check_for_idle(), ) if len(self.events_manager.subscriptions) > 0: - self.events_task = asyncio_create_task_with_done_error_log( + self.events_task = asyncio_create_task( self.events_manager.start(), ) diff --git a/vocode/streaming/synthesizer/base_synthesizer.py b/vocode/streaming/synthesizer/base_synthesizer.py index 5038cf266..bdbd6d7d5 100644 --- a/vocode/streaming/synthesizer/base_synthesizer.py +++ b/vocode/streaming/synthesizer/base_synthesizer.py @@ -21,7 +21,7 @@ from vocode.streaming.telephony.constants import MULAW_SILENCE_BYTE, PCM_SILENCE_BYTE from vocode.streaming.utils import convert_wav, get_chunk_size_per_second from vocode.streaming.utils.async_requester import AsyncRequestor -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task FILLER_PHRASES = [ BaseMessage(text="Um..."), @@ -428,7 +428,7 @@ async def send_chunks(): miniaudio_worker.consume_nonblocking(None) # sentinel try: - asyncio_create_task_with_done_error_log(send_chunks(), reraise_cancelled=True) + asyncio_create_task(send_chunks(), reraise_cancelled=True) # Await the output queue of the MiniaudioWorker and yield the wav chunks in another loop while True: diff --git a/vocode/streaming/synthesizer/eleven_labs_synthesizer.py b/vocode/streaming/synthesizer/eleven_labs_synthesizer.py index 714b5bf72..1ccd43547 100644 --- a/vocode/streaming/synthesizer/eleven_labs_synthesizer.py +++ b/vocode/streaming/synthesizer/eleven_labs_synthesizer.py @@ -10,7 +10,7 @@ from vocode.streaming.models.message import BaseMessage from vocode.streaming.models.synthesizer import ElevenLabsSynthesizerConfig from vocode.streaming.synthesizer.base_synthesizer import BaseSynthesizer, SynthesisResult -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task ELEVEN_LABS_BASE_URL = "https://api.elevenlabs.io/v1/" STREAMED_CHUNK_SIZE = 16000 * 2 // 4 # 1/8 of a second of 16kHz audio with 16-bit samples @@ -97,7 +97,7 @@ async def create_speech_uncached( body["model_id"] = self.model_id chunk_queue: asyncio.Queue[Optional[bytes]] = asyncio.Queue() - asyncio_create_task_with_done_error_log( + asyncio_create_task( self.get_chunks(url, headers, body, chunk_size, chunk_queue), ) diff --git a/vocode/streaming/synthesizer/play_ht_synthesizer_v2.py b/vocode/streaming/synthesizer/play_ht_synthesizer_v2.py index 6db09ef7d..1e58b70fb 100644 --- a/vocode/streaming/synthesizer/play_ht_synthesizer_v2.py +++ b/vocode/streaming/synthesizer/play_ht_synthesizer_v2.py @@ -18,7 +18,7 @@ ) from vocode.streaming.synthesizer.synthesizer_utils import split_text from vocode.streaming.utils import generate_from_async_iter_with_lookahead, generate_with_is_last -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task PLAY_HT_ON_PREM_ADDR = os.environ.get("VOCODE_PLAYHT_ON_PREM_ADDR", None) PLAY_HT_V2_MAX_CHARS = 200 @@ -86,7 +86,7 @@ async def create_speech_uncached( self.total_chars += len(message.text) chunk_queue: asyncio.Queue[Optional[bytes]] = asyncio.Queue() - asyncio_create_task_with_done_error_log( + asyncio_create_task( self.get_chunks( message, chunk_size, diff --git a/vocode/streaming/utils/create_task.py b/vocode/streaming/utils/create_task.py index 9554734d0..38778d0bf 100644 --- a/vocode/streaming/utils/create_task.py +++ b/vocode/streaming/utils/create_task.py @@ -6,25 +6,12 @@ tasks_registry = [] -def log_if_exception(reraise_cancelled: bool, future: asyncio.Future) -> None: - try: - if exc := future.exception(): - logger.exception( - f"Vocode wrapped logger; exception raised by task {future}: {exc}", - exc_info=exc, - ) - except asyncio.CancelledError: - if reraise_cancelled: - raise - - -def asyncio_create_task_with_done_error_log( +def asyncio_create_task( *args, reraise_cancelled: bool = False, **kwargs, ) -> asyncio.Task: task = asyncio.create_task(*args, **kwargs) tasks_registry.append(task) - task.add_done_callback(functools.partial(log_if_exception, reraise_cancelled)) task.add_done_callback(lambda t: tasks_registry.remove(t)) return task diff --git a/vocode/streaming/utils/worker.py b/vocode/streaming/utils/worker.py index cdbff41ac..16c52a655 100644 --- a/vocode/streaming/utils/worker.py +++ b/vocode/streaming/utils/worker.py @@ -7,7 +7,7 @@ import janus from loguru import logger -from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log +from vocode.streaming.utils.create_task import asyncio_create_task WorkerInputType = TypeVar("WorkerInputType") @@ -23,7 +23,7 @@ def __init__( self.output_queue = output_queue def start(self) -> asyncio.Task: - self.worker_task = asyncio_create_task_with_done_error_log( + self.worker_task = asyncio_create_task( self._run_loop(), ) if not self.worker_task: @@ -60,7 +60,7 @@ def __init__( def start(self) -> asyncio.Task: self.worker_thread = threading.Thread(target=self._run_loop) self.worker_thread.start() - self.worker_task = asyncio_create_task_with_done_error_log( + self.worker_task = asyncio_create_task( self.run_thread_forwarding(), ) if not self.worker_task: @@ -224,7 +224,7 @@ async def _run_loop(self): if item.is_interrupted(): continue self.interruptible_event = item - self.current_task = asyncio_create_task_with_done_error_log( + self.current_task = asyncio_create_task( self.process(item), reraise_cancelled=True, )