Skip to content

Commit

Permalink
implement exotel with vocode
Browse files Browse the repository at this point in the history
  • Loading branch information
akshaykumar-ak committed Jul 16, 2024
1 parent 3dc1d49 commit 2bbe9fd
Show file tree
Hide file tree
Showing 13 changed files with 505 additions and 9 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ tiktoken = "^0.7.0"
uvicorn = "^0.30.0"
websockets = "^12.0"
nltk = "^3.8.1"
xmltodict = "^0.13.0"

# LLM Providers
groq = { version = "^0.9.0", optional = true }
Expand Down
1 change: 1 addition & 0 deletions vocode/streaming/agent/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class AgentInput(TypedModel, type=AgentInputType.BASE.value): # type: ignore
conversation_id: str
vonage_uuid: Optional[str]
twilio_sid: Optional[str]
exotel_sid: Optional[str]
agent_response_tracker: Optional[asyncio.Event] = None

class Config:
Expand Down
37 changes: 35 additions & 2 deletions vocode/streaming/models/telephony.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
DEFAULT_SAMPLING_RATE,
VONAGE_AUDIO_ENCODING,
VONAGE_CHUNK_SIZE,
VONAGE_SAMPLING_RATE,
VONAGE_SAMPLING_RATE, EXOTEL_AUDIO_ENCODING, EXOTEL_CHUNK_SIZE,
)


Expand All @@ -37,6 +37,15 @@ class VonageConfig(TelephonyProviderConfig):
private_key: str


class ExotelConfig(TelephonyProviderConfig):
account_sid: str
subdomain: str
api_key: str
api_token: str
app_id: str = None
extra_params: Optional[Dict[str, Any]] = {}


class CallEntity(BaseModel):
phone_number: str

Expand Down Expand Up @@ -89,6 +98,7 @@ class CallConfigType(str, Enum):
BASE = "call_config_base"
TWILIO = "call_config_twilio"
VONAGE = "call_config_vonage"
EXOTEL = "call_config_exotel"


PhoneCallDirection = Literal["inbound", "outbound"]
Expand Down Expand Up @@ -137,6 +147,29 @@ def default_synthesizer_config():
)


class ExotelCallConfig(BaseCallConfig, type=CallConfigType.EXOTEL.value):
exotel_config: ExotelConfig
exotel_sid: str

@staticmethod
def default_transcriber_config():
return DeepgramTranscriberConfig(
sampling_rate=DEFAULT_SAMPLING_RATE,
audio_encoding=EXOTEL_AUDIO_ENCODING,
chunk_size=EXOTEL_CHUNK_SIZE,
model="phonecall",
tier="nova",
endpointing_config=PunctuationEndpointingConfig(),
)

@staticmethod
def default_synthesizer_config():
return AzureSynthesizerConfig(
sampling_rate=DEFAULT_SAMPLING_RATE,
audio_encoding=EXOTEL_AUDIO_ENCODING,
)


class VonageCallConfig(BaseCallConfig, type=CallConfigType.VONAGE.value): # type: ignore
vonage_config: VonageConfig
vonage_uuid: str
Expand All @@ -161,4 +194,4 @@ def default_synthesizer_config():
)


TelephonyConfig = Union[TwilioConfig, VonageConfig]
TelephonyConfig = Union[TwilioConfig, VonageConfig, ExotelConfig]
138 changes: 138 additions & 0 deletions vocode/streaming/output_device/exotel_output_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from __future__ import annotations

import asyncio
import base64
import json
from typing import List, Optional, Union

from fastapi import WebSocket
from fastapi.websockets import WebSocketState
from loguru import logger
from pydantic import BaseModel

from vocode.streaming.output_device.abstract_output_device import AbstractOutputDevice
from vocode.streaming.output_device.audio_chunk import AudioChunk, ChunkState
from vocode.streaming.telephony.constants import DEFAULT_SAMPLING_RATE, EXOTEL_AUDIO_ENCODING
from vocode.streaming.utils.create_task import asyncio_create_task
from vocode.streaming.utils.dtmf_utils import DTMFToneGenerator, KeypadEntry
from vocode.streaming.utils.worker import InterruptibleEvent


class ChunkFinishedMarkMessage(BaseModel):
chunk_id: str


MarkMessage = Union[ChunkFinishedMarkMessage] # space for more mark messages


class ExotelOutputDevice(AbstractOutputDevice):
def __init__(self, ws: Optional[WebSocket] = None, stream_sid: Optional[str] = None):
super().__init__(sampling_rate=DEFAULT_SAMPLING_RATE, audio_encoding=EXOTEL_AUDIO_ENCODING)
self.ws = ws
self.stream_sid = stream_sid
self.active = True

self._exotel_events_queue: asyncio.Queue[str] = asyncio.Queue()
self._mark_message_queue: asyncio.Queue[MarkMessage] = asyncio.Queue()
self._unprocessed_audio_chunks_queue: asyncio.Queue[InterruptibleEvent[AudioChunk]] = (
asyncio.Queue()
)

def consume_nonblocking(self, item: InterruptibleEvent[AudioChunk]):
if not item.is_interrupted():
self._send_audio_chunk_and_mark(
chunk=item.payload.data, chunk_id=str(item.payload.chunk_id)
)
self._unprocessed_audio_chunks_queue.put_nowait(item)
else:
audio_chunk = item.payload
audio_chunk.on_interrupt()
audio_chunk.state = ChunkState.INTERRUPTED

def interrupt(self):
self._send_clear_message()

def enqueue_mark_message(self, mark_message: MarkMessage):
self._mark_message_queue.put_nowait(mark_message)

def send_dtmf_tones(self, keypad_entries: List[KeypadEntry]):
tone_generator = DTMFToneGenerator()
for keypad_entry in keypad_entries:
logger.info(f"Sending DTMF tone {keypad_entry.value}")
dtmf_tone = tone_generator.generate(
keypad_entry, sampling_rate=self.sampling_rate, audio_encoding=self.audio_encoding
)
dtmf_message = {
"event": "media",
"stream_sid": self.stream_sid,
"media": {"payload": base64.b64encode(dtmf_tone).decode("utf-8")},
}
self._exotel_events_queue.put_nowait(json.dumps(dtmf_message))

async def _send_exotel_messages(self):
while True:
try:
exotel_event = await self._exotel_events_queue.get()
except asyncio.CancelledError:
return
if self.ws.application_state == WebSocketState.DISCONNECTED:
break
await self.ws.send_text(exotel_event)

async def _process_mark_messages(self):
while True:
try:
# mark messages are tagged with the chunk ID that is attached to the audio chunk
# but they are guaranteed to come in the same order as the audio chunks, and we
# don't need to build resiliency there
mark_message = await self._mark_message_queue.get()
item = await self._unprocessed_audio_chunks_queue.get()
except asyncio.CancelledError:
return

self.interruptible_event = item
audio_chunk = item.payload

if mark_message.chunk_id != str(audio_chunk.chunk_id):
logger.error(
f"Received a mark message out of order with chunk ID {mark_message.chunk_id}"
)

if item.is_interrupted():
audio_chunk.on_interrupt()
audio_chunk.state = ChunkState.INTERRUPTED
continue

audio_chunk.on_play()
audio_chunk.state = ChunkState.PLAYED

self.interruptible_event.is_interruptible = False

async def _run_loop(self):
send_exotel_messages_task = asyncio_create_task(self._send_exotel_messages())
process_mark_messages_task = asyncio_create_task(self._process_mark_messages())
await asyncio.gather(send_exotel_messages_task, process_mark_messages_task)

def _send_audio_chunk_and_mark(self, chunk: bytes, chunk_id: str):
media_message = {
"event": "media",
"stream_sid": self.stream_sid,
"media": {"payload": base64.b64encode(chunk).decode("utf-8")},
}
self._exotel_events_queue.put_nowait(json.dumps(media_message))

mark_message = {
"event": "mark",
"stream_sid": self.stream_sid,
"mark": {
"name": chunk_id,
},
}
self._exotel_events_queue.put_nowait(json.dumps(mark_message))

def _send_clear_message(self):
clear_message = {
"event": "clear",
"stream_sid": self.stream_sid,
}
self._exotel_events_queue.put_nowait(json.dumps(clear_message))
1 change: 1 addition & 0 deletions vocode/streaming/streaming_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ async def process(self, transcription: Transcription):
conversation_id=self.conversation.id,
vonage_uuid=getattr(self.conversation, "vonage_uuid", None),
twilio_sid=getattr(self.conversation, "twilio_sid", None),
exotel_sid=getattr(self.conversation, "exotel_sid", None),
agent_response_tracker=agent_response_tracker,
),
)
Expand Down
81 changes: 81 additions & 0 deletions vocode/streaming/telephony/client/exotel_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import os
from typing import Dict, Optional

import aiohttp
import xmltodict
from loguru import logger
from vocode.streaming.models.telephony import ExotelConfig
from vocode.streaming.telephony.client.abstract_telephony_client import AbstractTelephonyClient
from vocode.streaming.utils.async_requester import AsyncRequestor


class ExotelBadRequestException(ValueError):
pass


class ExotelException(ValueError):
pass


class ExotelClient(AbstractTelephonyClient):
def __init__(
self,
base_url: str,
maybe_exotel_config: Optional[ExotelConfig] = None,
):
self.exotel_config = maybe_exotel_config or ExotelConfig(
account_sid=os.environ["EXOTEL_ACCOUNT_SID"],
subdomain=os.environ["EXOTEL_SUBDOMAIN"],
api_key=os.environ["EXOTEL_API_KEY"],
api_token=os.environ["EXOTEL_API_TOKEN"],
app_id=os.environ["EXOTEL_APP_ID"],
)
self.auth = aiohttp.BasicAuth(login=self.exotel_config.api_key, password=self.exotel_config.api_token)
super().__init__(base_url=base_url)

def get_telephony_config(self):
return self.exotel_config

@staticmethod
def create_call_exotel(base_url, conversation_id, is_outbound: bool = False):
return {"url": f"wss://{base_url}/connect_call/{conversation_id}"}

async def create_call(
self,
conversation_id: str,
to_phone: str,
from_phone: str,
record: bool = False, # currently no-op
digits: Optional[str] = None, # currently no-op
telephony_params: Optional[Dict[str, str]] = None,
) -> str:
data = {
'From': to_phone,
'CallerId': from_phone,
'Url': f'http://my.exotel.com/{self.exotel_config.account_sid}/exoml/start_voice/{self.exotel_config.app_id}',
'CustomField': conversation_id
}
async with AsyncRequestor().get_session().post(
f'https://{self.exotel_config.subdomain}/v1/Accounts/{self.exotel_config.account_sid}/Calls/connect',
auth=self.auth,
data=data
) as response:
if not response.ok:
if response.status == 400:
logger.warning(
f"Failed to create call: {response.status} {response.reason} {await response.json()}"
)
raise ExotelBadRequestException(
"Telephony provider rejected call; this is usually due to a bad/malformed number. "
)
else:
raise ExotelException(
f"Twilio failed to create call: {response.status} {response.reason}"
)
xml_data = await response.text()
exotel_response = xmltodict.parse(xml_data)
call_sid = exotel_response['TwilioResponse']['Call']['Sid']
return call_sid

async def end_call(self, twilio_sid):
pass
3 changes: 3 additions & 0 deletions vocode/streaming/telephony/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@
VONAGE_CHUNK_SIZE = 640 # 20ms at 16kHz with 16bit samples
VONAGE_CONTENT_TYPE = "audio/l16;rate=16000"
PCM_SILENCE_BYTE = b"\x00"

EXOTEL_CHUNK_SIZE = int(DEFAULT_SAMPLING_RATE / 10)
EXOTEL_AUDIO_ENCODING = AudioEncoding.LINEAR16
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from vocode.streaming.models.synthesizer import SynthesizerConfig
from vocode.streaming.models.telephony import PhoneCallDirection
from vocode.streaming.models.transcriber import TranscriberConfig
from vocode.streaming.output_device.exotel_output_device import ExotelOutputDevice
from vocode.streaming.output_device.twilio_output_device import TwilioOutputDevice
from vocode.streaming.output_device.vonage_output_device import VonageOutputDevice
from vocode.streaming.streaming_conversation import StreamingConversation
Expand All @@ -21,12 +22,12 @@
from vocode.streaming.utils.events_manager import EventsManager

TelephonyOutputDeviceType = TypeVar(
"TelephonyOutputDeviceType", bound=Union[TwilioOutputDevice, VonageOutputDevice]
"TelephonyOutputDeviceType", bound=Union[TwilioOutputDevice, VonageOutputDevice, ExotelOutputDevice]
)

LOW_INTERRUPT_SENSITIVITY_THRESHOLD = 0.9

TelephonyProvider = Literal["twilio", "vonage"]
TelephonyProvider = Literal["twilio", "vonage", "exotel"]


class AbstractPhoneConversation(StreamingConversation[TelephonyOutputDeviceType]):
Expand Down
Loading

0 comments on commit 2bbe9fd

Please sign in to comment.