Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove catch-all exception logger for asyncio tasks #605

Merged
merged 3 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions vocode/streaming/agent/chat_gpt_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,13 @@ async def _create_openai_stream_with_fallback(
)
self.apply_model_fallback(chat_parameters)
stream = await self.openai_client.chat.completions.create(**chat_parameters)
except Exception as e:
logger.error(
f"Error while hitting OpenAI with chat_parameters: {chat_parameters}",
exc_info=True,
)
raise e
return stream

async def _create_openai_stream(self, chat_parameters: Dict[str, Any]) -> AsyncGenerator:
if self.agent_config.llm_fallback is not None and self.openai_client.max_retries == 0:
stream = await self._create_openai_stream_with_fallback(chat_parameters)
else:
try:
stream = await self.openai_client.chat.completions.create(**chat_parameters)
except Exception as e:
logger.error(
f"Error while hitting OpenAI with chat_parameters: {chat_parameters}",
exc_info=True,
)
raise e
stream = await self.openai_client.chat.completions.create(**chat_parameters)
return stream

def should_backchannel(self, human_input: str) -> bool:
Expand Down
10 changes: 3 additions & 7 deletions vocode/streaming/output_device/twilio_output_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from vocode.streaming.output_device.abstract_output_device import AbstractOutputDevice
from vocode.streaming.output_device.audio_chunk import AudioChunk, ChunkState
from vocode.streaming.telephony.constants import DEFAULT_AUDIO_ENCODING, DEFAULT_SAMPLING_RATE
from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log
from vocode.streaming.utils.create_task import asyncio_create_task
from vocode.streaming.utils.worker import InterruptibleEvent


Expand Down Expand Up @@ -95,12 +95,8 @@ async def _process_mark_messages(self):
self.interruptible_event.is_interruptible = False

async def _run_loop(self):
send_twilio_messages_task = asyncio_create_task_with_done_error_log(
self._send_twilio_messages()
)
process_mark_messages_task = asyncio_create_task_with_done_error_log(
self._process_mark_messages()
)
send_twilio_messages_task = asyncio_create_task(self._send_twilio_messages())
process_mark_messages_task = asyncio_create_task(self._process_mark_messages())
await asyncio.gather(send_twilio_messages_task, process_mark_messages_task)

def _send_audio_chunk_and_mark(self, chunk: bytes, chunk_id: str):
Expand Down
8 changes: 4 additions & 4 deletions vocode/streaming/streaming_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
enumerate_async_iter,
get_chunk_size_per_second,
)
from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log
from vocode.streaming.utils.create_task import asyncio_create_task
from vocode.streaming.utils.events_manager import EventsManager
from vocode.streaming.utils.speed_manager import SpeedManager
from vocode.streaming.utils.state_manager import ConversationStateManager
Expand Down Expand Up @@ -710,7 +710,7 @@ async def start(self, mark_ready: Optional[Callable[[], Awaitable[None]]] = None
self.agent.start()
initial_message = self.agent.get_agent_config().initial_message
if initial_message:
asyncio_create_task_with_done_error_log(
asyncio_create_task(
self.send_initial_message(initial_message),
)
else:
Expand All @@ -719,11 +719,11 @@ async def start(self, mark_ready: Optional[Callable[[], Awaitable[None]]] = None
if mark_ready:
await mark_ready()
self.is_terminated.clear()
self.check_for_idle_task = asyncio_create_task_with_done_error_log(
self.check_for_idle_task = asyncio_create_task(
self.check_for_idle(),
)
if len(self.events_manager.subscriptions) > 0:
self.events_task = asyncio_create_task_with_done_error_log(
self.events_task = asyncio_create_task(
self.events_manager.start(),
)

Expand Down
4 changes: 2 additions & 2 deletions vocode/streaming/synthesizer/base_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from vocode.streaming.telephony.constants import MULAW_SILENCE_BYTE, PCM_SILENCE_BYTE
from vocode.streaming.utils import convert_wav, get_chunk_size_per_second
from vocode.streaming.utils.async_requester import AsyncRequestor
from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log
from vocode.streaming.utils.create_task import asyncio_create_task

FILLER_PHRASES = [
BaseMessage(text="Um..."),
Expand Down Expand Up @@ -428,7 +428,7 @@ async def send_chunks():
miniaudio_worker.consume_nonblocking(None) # sentinel

try:
asyncio_create_task_with_done_error_log(send_chunks(), reraise_cancelled=True)
asyncio_create_task(send_chunks(), reraise_cancelled=True)

# Await the output queue of the MiniaudioWorker and yield the wav chunks in another loop
while True:
Expand Down
4 changes: 2 additions & 2 deletions vocode/streaming/synthesizer/eleven_labs_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from vocode.streaming.models.message import BaseMessage
from vocode.streaming.models.synthesizer import ElevenLabsSynthesizerConfig
from vocode.streaming.synthesizer.base_synthesizer import BaseSynthesizer, SynthesisResult
from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log
from vocode.streaming.utils.create_task import asyncio_create_task

ELEVEN_LABS_BASE_URL = "https://api.elevenlabs.io/v1/"
STREAMED_CHUNK_SIZE = 16000 * 2 // 4 # 1/8 of a second of 16kHz audio with 16-bit samples
Expand Down Expand Up @@ -97,7 +97,7 @@ async def create_speech_uncached(
body["model_id"] = self.model_id

chunk_queue: asyncio.Queue[Optional[bytes]] = asyncio.Queue()
asyncio_create_task_with_done_error_log(
asyncio_create_task(
self.get_chunks(url, headers, body, chunk_size, chunk_queue),
)

Expand Down
4 changes: 2 additions & 2 deletions vocode/streaming/synthesizer/play_ht_synthesizer_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)
from vocode.streaming.synthesizer.synthesizer_utils import split_text
from vocode.streaming.utils import generate_from_async_iter_with_lookahead, generate_with_is_last
from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log
from vocode.streaming.utils.create_task import asyncio_create_task

PLAY_HT_ON_PREM_ADDR = os.environ.get("VOCODE_PLAYHT_ON_PREM_ADDR", None)
PLAY_HT_V2_MAX_CHARS = 200
Expand Down Expand Up @@ -86,7 +86,7 @@ async def create_speech_uncached(

self.total_chars += len(message.text)
chunk_queue: asyncio.Queue[Optional[bytes]] = asyncio.Queue()
asyncio_create_task_with_done_error_log(
asyncio_create_task(
self.get_chunks(
message,
chunk_size,
Expand Down
15 changes: 1 addition & 14 deletions vocode/streaming/utils/create_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,12 @@
tasks_registry = []


def log_if_exception(reraise_cancelled: bool, future: asyncio.Future) -> None:
try:
if exc := future.exception():
logger.exception(
f"Vocode wrapped logger; exception raised by task {future}: {exc}",
exc_info=exc,
)
except asyncio.CancelledError:
if reraise_cancelled:
raise


def asyncio_create_task_with_done_error_log(
def asyncio_create_task(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine - i understand why we need tasks_registry, but I wonder if we audit the use-cases and eventually remove this method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary reason I removed this is because each exception log causes a duplicate error in sentry. And we can just catch them by the raised error. Is there a scenario where we actually need this method/functionality?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe we keep tasks in tasks_registry so that these tasks don't get garbage collected when they aren't assigned to some state in a worker, e.g.

i was thinking we would eventually remove it and replace it with asyncio.create_task

Copy link
Contributor Author

@adnaans adnaans Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sure, you're talking about the method in general, not the change I made. Yeah eventually should be good to change to asyncio.create_task

*args,
reraise_cancelled: bool = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be a regression to remove this, right? remind me again what happens to tasks normally when they get cancelled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not removing it though? It's just a method rename to not have "with done error log"

Copy link
Contributor Author

@adnaans adnaans Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to the reraise_cancelled part? That part didn't make sense to me because we only check for an asyncio.CancelledError when trying to log (which seems like would never happen). Has that occurred before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'm referring to the reraise_cancelled part

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reading back the code, this is safe - all reraise_cancelled = True does is make more visible a cancel error when it actually gets cancelled

**kwargs,
) -> asyncio.Task:
task = asyncio.create_task(*args, **kwargs)
tasks_registry.append(task)
task.add_done_callback(functools.partial(log_if_exception, reraise_cancelled))
task.add_done_callback(lambda t: tasks_registry.remove(t))
return task
8 changes: 4 additions & 4 deletions vocode/streaming/utils/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import janus
from loguru import logger

from vocode.streaming.utils.create_task import asyncio_create_task_with_done_error_log
from vocode.streaming.utils.create_task import asyncio_create_task

WorkerInputType = TypeVar("WorkerInputType")

Expand All @@ -23,7 +23,7 @@ def __init__(
self.output_queue = output_queue

def start(self) -> asyncio.Task:
self.worker_task = asyncio_create_task_with_done_error_log(
self.worker_task = asyncio_create_task(
self._run_loop(),
)
if not self.worker_task:
Expand Down Expand Up @@ -60,7 +60,7 @@ def __init__(
def start(self) -> asyncio.Task:
self.worker_thread = threading.Thread(target=self._run_loop)
self.worker_thread.start()
self.worker_task = asyncio_create_task_with_done_error_log(
self.worker_task = asyncio_create_task(
self.run_thread_forwarding(),
)
if not self.worker_task:
Expand Down Expand Up @@ -224,7 +224,7 @@ async def _run_loop(self):
if item.is_interrupted():
continue
self.interruptible_event = item
self.current_task = asyncio_create_task_with_done_error_log(
self.current_task = asyncio_create_task(
self.process(item),
reraise_cancelled=True,
)
Expand Down
Loading