Skip to content

Commit

Permalink
makes some variables private and also makes the chunk id coming back …
Browse files Browse the repository at this point in the history
…from the mark match the incoming audio chunk
  • Loading branch information
ajar98 committed Jun 27, 2024
1 parent 29cceca commit 418db81
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions vocode/streaming/output_device/twilio_output_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,18 @@ def __init__(self, ws: Optional[WebSocket] = None, stream_sid: Optional[str] = N
self.stream_sid = stream_sid
self.active = True

self.twilio_events_queue: asyncio.Queue[str] = asyncio.Queue()
self.mark_message_queue: asyncio.Queue[MarkMessage] = asyncio.Queue()
self.unprocessed_audio_chunks_queue: asyncio.Queue[InterruptibleEvent[AudioChunk]] = (
self._twilio_events_queue: asyncio.Queue[str] = asyncio.Queue()
self._mark_message_queue: asyncio.Queue[MarkMessage] = asyncio.Queue()
self._unprocessed_audio_chunks_queue: asyncio.Queue[InterruptibleEvent[AudioChunk]] = (
asyncio.Queue()
)

def consume_nonblocking(self, item: InterruptibleEvent[AudioChunk]):
if not item.is_interrupted():
self._send_audio_chunk_and_mark(item.payload.data)
self.unprocessed_audio_chunks_queue.put_nowait(item)
self._send_audio_chunk_and_mark(
chunk=item.payload.data, chunk_id=str(item.payload.chunk_id)
)
self._unprocessed_audio_chunks_queue.put_nowait(item)
else:
audio_chunk = item.payload
audio_chunk.on_interrupt()
Expand All @@ -50,12 +52,12 @@ def interrupt(self):
self._send_clear_message()

def enqueue_mark_message(self, mark_message: MarkMessage):
self.mark_message_queue.put_nowait(mark_message)
self._mark_message_queue.put_nowait(mark_message)

async def _send_twilio_messages(self):
while True:
try:
twilio_event = await self.twilio_events_queue.get()
twilio_event = await self._twilio_events_queue.get()
except asyncio.CancelledError:
return
if self.ws.application_state == WebSocketState.DISCONNECTED:
Expand All @@ -68,8 +70,8 @@ async def _process_mark_messages(self):
# mark messages are tagged with the chunk ID that is attached to the audio chunk
# but they are guaranteed to come in the same order as the audio chunks, and we
# don't need to build resiliency there
await self.mark_message_queue.get()
item = await self.unprocessed_audio_chunks_queue.get()
await self._mark_message_queue.get()
item = await self._unprocessed_audio_chunks_queue.get()
except asyncio.CancelledError:
return

Expand All @@ -95,25 +97,25 @@ async def _run_loop(self):
)
await asyncio.gather(send_twilio_messages_task, process_mark_messages_task)

def _send_audio_chunk_and_mark(self, chunk: bytes):
def _send_audio_chunk_and_mark(self, chunk: bytes, chunk_id: str):
media_message = {
"event": "media",
"streamSid": self.stream_sid,
"media": {"payload": base64.b64encode(chunk).decode("utf-8")},
}
self.twilio_events_queue.put_nowait(json.dumps(media_message))
self._twilio_events_queue.put_nowait(json.dumps(media_message))
mark_message = {
"event": "mark",
"streamSid": self.stream_sid,
"mark": {
"name": str(uuid.uuid4()),
"name": chunk_id,
},
}
self.twilio_events_queue.put_nowait(json.dumps(mark_message))
self._twilio_events_queue.put_nowait(json.dumps(mark_message))

def _send_clear_message(self):
clear_message = {
"event": "clear",
"streamSid": self.stream_sid,
}
self.twilio_events_queue.put_nowait(json.dumps(clear_message))
self._twilio_events_queue.put_nowait(json.dumps(clear_message))

0 comments on commit 418db81

Please sign in to comment.