diff --git a/vocode/streaming/output_device/twilio_output_device.py b/vocode/streaming/output_device/twilio_output_device.py index 062d998dd..74e0e9c07 100644 --- a/vocode/streaming/output_device/twilio_output_device.py +++ b/vocode/streaming/output_device/twilio_output_device.py @@ -8,6 +8,7 @@ from fastapi import WebSocket from fastapi.websockets import WebSocketState +from loguru import logger from pydantic import BaseModel from vocode.streaming.output_device.abstract_output_device import AbstractOutputDevice @@ -70,7 +71,7 @@ async def _process_mark_messages(self): # mark messages are tagged with the chunk ID that is attached to the audio chunk # but they are guaranteed to come in the same order as the audio chunks, and we # don't need to build resiliency there - await self._mark_message_queue.get() + mark_message = await self._mark_message_queue.get() item = await self._unprocessed_audio_chunks_queue.get() except asyncio.CancelledError: return @@ -78,6 +79,11 @@ async def _process_mark_messages(self): self.interruptible_event = item audio_chunk = item.payload + if mark_message.chunk_id != str(audio_chunk.chunk_id): + logger.error( + f"Received a mark message out of order with chunk ID {mark_message.chunk_id}" + ) + if item.is_interrupted(): audio_chunk.on_interrupt() audio_chunk.state = ChunkState.INTERRUPTED diff --git a/vocode/streaming/output_device/vonage_output_device.py b/vocode/streaming/output_device/vonage_output_device.py index 8385bfaaf..53234e67c 100644 --- a/vocode/streaming/output_device/vonage_output_device.py +++ b/vocode/streaming/output_device/vonage_output_device.py @@ -36,5 +36,5 @@ async def play(self, chunk: bytes): subchunk = chunk[i : i + VONAGE_CHUNK_SIZE] if len(subchunk) % 2 == 1: subchunk += PCM_SILENCE_BYTE # pad with silence, Vonage goes crazy otherwise - if self.ws and self.ws.application_state == WebSocketState.DISCONNECTED: + if self.ws and self.ws.application_state != WebSocketState.DISCONNECTED: await self.ws.send_bytes(subchunk) diff --git a/vocode/streaming/streaming_conversation.py b/vocode/streaming/streaming_conversation.py index 0e84e4285..bdb563230 100644 --- a/vocode/streaming/streaming_conversation.py +++ b/vocode/streaming/streaming_conversation.py @@ -945,6 +945,7 @@ def _on_interrupt(): "on_interrupt", create_on_interrupt_callback(processed_event), ) + # Prevents the case where we send a chunk after the output device has been interrupted async with self.interrupt_lock: self.output_device.consume_nonblocking( InterruptibleEvent(