Skip to content

Commit

Permalink
Remove catch-all exception logger for asyncio tasks (#605)
Browse files Browse the repository at this point in the history
* remove error log from exception for asyncio tasks

* remove log error on chatgpt query
  • Loading branch information
adnaans committed Jul 5, 2024
1 parent 41f38bc commit c580d13
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 49 deletions.
15 changes: 1 addition & 14 deletions vocode/streaming/agent/chat_gpt_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,13 @@ async def _create_openai_stream_with_fallback(
)
self.apply_model_fallback(chat_parameters)
stream = await self.openai_client.chat.completions.create(**chat_parameters)
except Exception as e:
logger.error(
f"Error while hitting OpenAI with chat_parameters: {chat_parameters}",
exc_info=True,
)
raise e
return stream

async def _create_openai_stream(self, chat_parameters: Dict[str, Any]) -> AsyncGenerator:
if self.agent_config.llm_fallback is not None and self.openai_client.max_retries == 0:
stream = await self._create_openai_stream_with_fallback(chat_parameters)
else:
try:
stream = await self.openai_client.chat.completions.create(**chat_parameters)
except Exception as e:
logger.error(
f"Error while hitting OpenAI with chat_parameters: {chat_parameters}",
exc_info=True,
)
raise e
stream = await self.openai_client.chat.completions.create(**chat_parameters)
return stream

def should_backchannel(self, human_input: str) -> bool:
Expand Down
10 changes: 3 additions & 7 deletions vocode/streaming/output_device/twilio_output_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from vocode.streaming.output_device.abstract_output_device import AbstractOutputDevice
from vocode.streaming.output_device.audio_chunk import AudioChunk, ChunkState
from vocode.streaming.telephony.constants import DEFAULT_AUDIO_ENCODING, DEFAULT_SAMPLING_RATE
from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log
from vocode.streaming.utils.create_task import asyncio_create_task
from vocode.streaming.utils.worker import InterruptibleEvent


Expand Down Expand Up @@ -95,12 +95,8 @@ async def _process_mark_messages(self):
self.interruptible_event.is_interruptible = False

async def _run_loop(self):
send_twilio_messages_task = asyncio_create_task_with_done_error_log(
self._send_twilio_messages()
)
process_mark_messages_task = asyncio_create_task_with_done_error_log(
self._process_mark_messages()
)
send_twilio_messages_task = asyncio_create_task(self._send_twilio_messages())
process_mark_messages_task = asyncio_create_task(self._process_mark_messages())
await asyncio.gather(send_twilio_messages_task, process_mark_messages_task)

def _send_audio_chunk_and_mark(self, chunk: bytes, chunk_id: str):
Expand Down
8 changes: 4 additions & 4 deletions vocode/streaming/streaming_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
enumerate_async_iter,
get_chunk_size_per_second,
)
from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log
from vocode.streaming.utils.create_task import asyncio_create_task
from vocode.streaming.utils.events_manager import EventsManager
from vocode.streaming.utils.speed_manager import SpeedManager
from vocode.streaming.utils.state_manager import ConversationStateManager
Expand Down Expand Up @@ -710,7 +710,7 @@ async def start(self, mark_ready: Optional[Callable[[], Awaitable[None]]] = None
self.agent.start()
initial_message = self.agent.get_agent_config().initial_message
if initial_message:
asyncio_create_task_with_done_error_log(
asyncio_create_task(
self.send_initial_message(initial_message),
)
else:
Expand All @@ -719,11 +719,11 @@ async def start(self, mark_ready: Optional[Callable[[], Awaitable[None]]] = None
if mark_ready:
await mark_ready()
self.is_terminated.clear()
self.check_for_idle_task = asyncio_create_task_with_done_error_log(
self.check_for_idle_task = asyncio_create_task(
self.check_for_idle(),
)
if len(self.events_manager.subscriptions) > 0:
self.events_task = asyncio_create_task_with_done_error_log(
self.events_task = asyncio_create_task(
self.events_manager.start(),
)

Expand Down
4 changes: 2 additions & 2 deletions vocode/streaming/synthesizer/base_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from vocode.streaming.telephony.constants import MULAW_SILENCE_BYTE, PCM_SILENCE_BYTE
from vocode.streaming.utils import convert_wav, get_chunk_size_per_second
from vocode.streaming.utils.async_requester import AsyncRequestor
from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log
from vocode.streaming.utils.create_task import asyncio_create_task

FILLER_PHRASES = [
BaseMessage(text="Um..."),
Expand Down Expand Up @@ -428,7 +428,7 @@ async def send_chunks():
miniaudio_worker.consume_nonblocking(None) # sentinel

try:
asyncio_create_task_with_done_error_log(send_chunks(), reraise_cancelled=True)
asyncio_create_task(send_chunks(), reraise_cancelled=True)

# Await the output queue of the MiniaudioWorker and yield the wav chunks in another loop
while True:
Expand Down
4 changes: 2 additions & 2 deletions vocode/streaming/synthesizer/eleven_labs_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from vocode.streaming.models.message import BaseMessage
from vocode.streaming.models.synthesizer import ElevenLabsSynthesizerConfig
from vocode.streaming.synthesizer.base_synthesizer import BaseSynthesizer, SynthesisResult
from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log
from vocode.streaming.utils.create_task import asyncio_create_task

ELEVEN_LABS_BASE_URL = "https://api.elevenlabs.io/v1/"
STREAMED_CHUNK_SIZE = 16000 * 2 // 4 # 1/8 of a second of 16kHz audio with 16-bit samples
Expand Down Expand Up @@ -97,7 +97,7 @@ async def create_speech_uncached(
body["model_id"] = self.model_id

chunk_queue: asyncio.Queue[Optional[bytes]] = asyncio.Queue()
asyncio_create_task_with_done_error_log(
asyncio_create_task(
self.get_chunks(url, headers, body, chunk_size, chunk_queue),
)

Expand Down
4 changes: 2 additions & 2 deletions vocode/streaming/synthesizer/play_ht_synthesizer_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)
from vocode.streaming.synthesizer.synthesizer_utils import split_text
from vocode.streaming.utils import generate_from_async_iter_with_lookahead, generate_with_is_last
from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log
from vocode.streaming.utils.create_task import asyncio_create_task

PLAY_HT_ON_PREM_ADDR = os.environ.get("VOCODE_PLAYHT_ON_PREM_ADDR", None)
PLAY_HT_V2_MAX_CHARS = 200
Expand Down Expand Up @@ -86,7 +86,7 @@ async def create_speech_uncached(

self.total_chars += len(message.text)
chunk_queue: asyncio.Queue[Optional[bytes]] = asyncio.Queue()
asyncio_create_task_with_done_error_log(
asyncio_create_task(
self.get_chunks(
message,
chunk_size,
Expand Down
15 changes: 1 addition & 14 deletions vocode/streaming/utils/create_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,12 @@
tasks_registry = []


def log_if_exception(reraise_cancelled: bool, future: asyncio.Future) -> None:
try:
if exc := future.exception():
logger.exception(
f"Vocode wrapped logger; exception raised by task {future}: {exc}",
exc_info=exc,
)
except asyncio.CancelledError:
if reraise_cancelled:
raise


def asyncio_create_task_with_done_error_log(
def asyncio_create_task(
*args,
reraise_cancelled: bool = False,
**kwargs,
) -> asyncio.Task:
task = asyncio.create_task(*args, **kwargs)
tasks_registry.append(task)
task.add_done_callback(functools.partial(log_if_exception, reraise_cancelled))
task.add_done_callback(lambda t: tasks_registry.remove(t))
return task
8 changes: 4 additions & 4 deletions vocode/streaming/utils/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import janus
from loguru import logger

from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log
from vocode.streaming.utils.create_task import asyncio_create_task

WorkerInputType = TypeVar("WorkerInputType")

Expand All @@ -23,7 +23,7 @@ def __init__(
self.output_queue = output_queue

def start(self) -> asyncio.Task:
self.worker_task = asyncio_create_task_with_done_error_log(
self.worker_task = asyncio_create_task(
self._run_loop(),
)
if not self.worker_task:
Expand Down Expand Up @@ -60,7 +60,7 @@ def __init__(
def start(self) -> asyncio.Task:
self.worker_thread = threading.Thread(target=self._run_loop)
self.worker_thread.start()
self.worker_task = asyncio_create_task_with_done_error_log(
self.worker_task = asyncio_create_task(
self.run_thread_forwarding(),
)
if not self.worker_task:
Expand Down Expand Up @@ -224,7 +224,7 @@ async def _run_loop(self):
if item.is_interrupted():
continue
self.interruptible_event = item
self.current_task = asyncio_create_task_with_done_error_log(
self.current_task = asyncio_create_task(
self.process(item),
reraise_cancelled=True,
)
Expand Down

0 comments on commit c580d13

Please sign in to comment.