-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Switch to using the From header for device ID It appears the From header is actually more meaningingful for calls coming from Asterisk, as the Contact header does not distinguish between which device initiated a call like the From header does. * Working on calling * Make IP's variables * Fix call phone script * Test script for outgoing calls This provides a test script for outgoing calls based on the work originally started by Michael Hansen. Instructions for running the outgoing call test script can be found in the README.md file. I tried to make the acceptable coding for the OPUS codecs in the SIP messages more flexible, but the number may need to be changed back from 96 to 123 to work with Grandstream phones. I don't have a Grandstream yet to test with. * Fix some type checking issues * Fix converting env variable to int * Updates for Grandstream compatibility Modify codec negotiation to work with Grandstream Move example code out of call_phone.py * Formatting fixes * Fix unused imports * Fix example in README * Change SipHeader parsing Use SipEndpoint dataclass for parsing SIP headers * Formatting fixes * Fix to make mypy happy * Remove print statments and loop parameter * Various review fixes Prefer using guard clauses Use SipEndpoint attributes where appropriate Fix end quote for description * Cleanup RTP/C servers when call ends * Fix import sort * Minor codestyle improvements --------- Co-authored-by: Michael Hansen <[email protected]>
- Loading branch information
1 parent
84ded2e
commit c32b8f0
Showing
10 changed files
with
755 additions
and
66 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ tmp/ | |
htmlcov | ||
|
||
.projectile | ||
.env | ||
.venv/ | ||
venv/ | ||
.mypy_cache/ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,19 @@ | ||
# VoIP Utils | ||
|
||
Voice over IP utilities for the [voip integration](https://www.home-assistant.io/integrations/voip/). | ||
|
||
## Test outgoing call | ||
Install dependencies from requirements_dev.txt | ||
|
||
Set environment variables for source and destination endpoints in .env file | ||
CALL_SRC_USER = "homeassistant" | ||
CALL_SRC_IP = "192.168.1.1" | ||
CALL_SRC_PORT = 5060 | ||
CALL_VIA_IP = "192.168.1.1" | ||
CALL_DEST_IP = "192.168.1.2" | ||
CALL_DEST_PORT = 5060 | ||
CALL_DEST_USER = "phone" | ||
|
||
Run script | ||
python call_example.py | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
import asyncio | ||
import logging | ||
import os | ||
import socket | ||
from functools import partial | ||
from pathlib import Path | ||
from typing import Any, Callable, Optional, Set | ||
|
||
from dotenv import load_dotenv | ||
|
||
from voip_utils.call_phone import VoipCallDatagramProtocol | ||
from voip_utils.sip import ( | ||
CallInfo, | ||
CallPhoneDatagramProtocol, | ||
SdpInfo, | ||
SipEndpoint, | ||
get_sip_endpoint, | ||
) | ||
from voip_utils.voip import RtcpDatagramProtocol, RtcpState, RtpDatagramProtocol | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
load_dotenv() | ||
|
||
|
||
def get_env_int(env_var: str, default_val: int) -> int: | ||
value = os.getenv(env_var) | ||
if value is None: | ||
return default_val | ||
try: | ||
return int(value) | ||
except ValueError: | ||
return default_val | ||
|
||
|
||
CALL_SRC_USER = os.getenv("CALL_SRC_USER") | ||
CALL_SRC_IP = os.getenv("CALL_SRC_IP", "127.0.0.1") | ||
CALL_SRC_PORT = get_env_int("CALL_SRC_PORT", 5060) | ||
CALL_VIA_IP = os.getenv("CALL_VIA_IP") | ||
CALL_DEST_IP = os.getenv("CALL_DEST_IP", "127.0.0.1") | ||
CALL_DEST_PORT = get_env_int("CALL_DEST_PORT", 5060) | ||
CALL_DEST_USER = os.getenv("CALL_DEST_USER") | ||
|
||
|
||
RATE = 16000 | ||
WIDTH = 2 | ||
CHANNELS = 1 | ||
RTP_AUDIO_SETTINGS = { | ||
"rate": RATE, | ||
"width": WIDTH, | ||
"channels": CHANNELS, | ||
"sleep_ratio": 0.99, | ||
} | ||
|
||
|
||
class PreRecordMessageProtocol(RtpDatagramProtocol): | ||
"""Plays a pre-recorded message on a loop.""" | ||
|
||
def __init__( | ||
self, | ||
file_name: str, | ||
opus_payload_type: int, | ||
message_delay: float = 1.0, | ||
loop_delay: float = 2.0, | ||
rtcp_state: RtcpState | None = None, | ||
) -> None: | ||
"""Set up RTP server.""" | ||
super().__init__( | ||
rate=RATE, | ||
width=WIDTH, | ||
channels=CHANNELS, | ||
opus_payload_type=opus_payload_type, | ||
rtcp_state=rtcp_state, | ||
) | ||
self.loop = asyncio.get_running_loop() | ||
self.file_name = file_name | ||
self.message_delay = message_delay | ||
self.loop_delay = loop_delay | ||
self._audio_task: asyncio.Task | None = None | ||
file_path = Path(__file__).parent / self.file_name | ||
self._audio_bytes: bytes = file_path.read_bytes() | ||
_LOGGER.debug("Created PreRecordMessageProtocol") | ||
|
||
def on_chunk(self, audio_bytes: bytes) -> None: | ||
"""Handle raw audio chunk.""" | ||
_LOGGER.debug("on_chunk") | ||
if self.transport is None: | ||
return | ||
|
||
if self._audio_task is None: | ||
self._audio_task = self.loop.create_task( | ||
self._play_message(), | ||
name="voip_not_connected", | ||
) | ||
|
||
async def _play_message(self) -> None: | ||
_LOGGER.debug("_play_message") | ||
self.send_audio( | ||
self._audio_bytes, | ||
self.rate, | ||
self.width, | ||
self.channels, | ||
self.addr, | ||
silence_before=self.message_delay, | ||
) | ||
|
||
await asyncio.sleep(self.loop_delay) | ||
|
||
# Allow message to play again - Only play once for testing | ||
# self._audio_task = None | ||
|
||
|
||
async def main() -> None: | ||
logging.basicConfig(level=logging.DEBUG) | ||
|
||
loop = asyncio.get_event_loop() | ||
source = get_sip_endpoint( | ||
host=CALL_SRC_IP, port=CALL_SRC_PORT, username=CALL_SRC_USER, description=None | ||
) | ||
destination = get_sip_endpoint( | ||
host=CALL_DEST_IP, | ||
port=CALL_DEST_PORT, | ||
username=CALL_DEST_USER, | ||
description=None, | ||
) | ||
|
||
# Find free RTP/RTCP ports | ||
rtp_port = 0 | ||
|
||
while True: | ||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
sock.setblocking(False) | ||
|
||
# Bind to a random UDP port | ||
sock.bind(("", 0)) | ||
_, rtp_port = sock.getsockname() | ||
|
||
# Close socket to free port for re-use | ||
sock.close() | ||
|
||
# Check that the next port up is available for RTCP | ||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
try: | ||
sock.bind(("", rtp_port + 1)) | ||
|
||
# Will be opened again below | ||
sock.close() | ||
|
||
# Found our ports | ||
break | ||
except OSError: | ||
# RTCP port is taken | ||
pass | ||
|
||
_, protocol = await loop.create_datagram_endpoint( | ||
lambda: VoipCallDatagramProtocol( | ||
None, | ||
source, | ||
destination, | ||
rtp_port, | ||
lambda call_info, rtcp_state: PreRecordMessageProtocol( | ||
"problem.pcm", | ||
call_info.opus_payload_type, | ||
rtcp_state=rtcp_state, | ||
), | ||
), | ||
local_addr=(CALL_SRC_IP, CALL_SRC_PORT), | ||
) | ||
|
||
await protocol.wait_closed() | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,3 +6,4 @@ isort==5.12.0 | |
mypy==1.1.1 | ||
pylint==3.2.5 | ||
pytest==7.2.2 | ||
python-dotenv==1.0.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,35 +1,74 @@ | ||
"""Test voip_utils SIP functionality.""" | ||
|
||
from voip_utils.sip import SipDatagramProtocol | ||
from voip_utils.sip import SipEndpoint, get_sip_endpoint | ||
|
||
|
||
def test_parse_header_for_uri(): | ||
endpoint, name = SipDatagramProtocol._parse_uri_header( | ||
None, '"Test Name" <sip:[email protected]>' | ||
) | ||
assert name == "Test Name" | ||
assert endpoint == "sip:[email protected]" | ||
endpoint = SipEndpoint('"Test Name" <sip:[email protected]>') | ||
assert endpoint.description == "Test Name" | ||
assert endpoint.uri == "sip:[email protected]" | ||
assert endpoint.username == "12345" | ||
assert endpoint.host == "example.com" | ||
assert endpoint.port == 5060 | ||
|
||
|
||
def test_parse_header_for_uri_no_name(): | ||
endpoint, name = SipDatagramProtocol._parse_uri_header( | ||
None, "sip:[email protected]" | ||
) | ||
assert name is None | ||
assert endpoint == "sip:[email protected]" | ||
endpoint = SipEndpoint("sip:[email protected]") | ||
assert endpoint.description is None | ||
assert endpoint.uri == "sip:[email protected]" | ||
|
||
|
||
def test_parse_header_for_uri_sips(): | ||
endpoint, name = SipDatagramProtocol._parse_uri_header( | ||
None, '"Test Name" <sips:[email protected]>' | ||
) | ||
assert name == "Test Name" | ||
assert endpoint == "sips:[email protected]" | ||
endpoint = SipEndpoint('"Test Name" <sips:[email protected]>') | ||
assert endpoint.description == "Test Name" | ||
assert endpoint.uri == "sips:[email protected]" | ||
|
||
|
||
def test_parse_header_for_uri_no_space_name(): | ||
endpoint, name = SipDatagramProtocol._parse_uri_header( | ||
None, "Test <sip:[email protected]>" | ||
) | ||
assert name == "Test" | ||
assert endpoint == "sip:[email protected]" | ||
endpoint = SipEndpoint("Test <sip:[email protected]>") | ||
assert endpoint.description == "Test" | ||
assert endpoint.uri == "sip:[email protected]" | ||
|
||
|
||
def test_parse_header_for_uri_no_username(): | ||
endpoint = SipEndpoint("Test <sip:example.com>") | ||
assert endpoint.description == "Test" | ||
assert endpoint.username is None | ||
assert endpoint.uri == "sip:example.com" | ||
|
||
|
||
def test_get_sip_endpoint(): | ||
endpoint = get_sip_endpoint("example.com") | ||
assert endpoint.host == "example.com" | ||
assert endpoint.port == 5060 | ||
assert endpoint.description is None | ||
assert endpoint.username is None | ||
assert endpoint.uri == "sip:example.com" | ||
|
||
|
||
def test_get_sip_endpoint_with_username(): | ||
endpoint = get_sip_endpoint("example.com", username="test") | ||
assert endpoint.host == "example.com" | ||
assert endpoint.port == 5060 | ||
assert endpoint.description is None | ||
assert endpoint.username == "test" | ||
assert endpoint.uri == "sip:[email protected]" | ||
|
||
|
||
def test_get_sip_endpoint_with_description(): | ||
endpoint = get_sip_endpoint("example.com", description="Test Endpoint") | ||
assert endpoint.host == "example.com" | ||
assert endpoint.port == 5060 | ||
assert endpoint.description == "Test Endpoint" | ||
assert endpoint.username is None | ||
assert endpoint.uri == "sip:example.com" | ||
assert endpoint.sip_header == '"Test Endpoint" <sip:example.com>' | ||
|
||
|
||
def test_get_sip_endpoint_with_scheme(): | ||
endpoint = get_sip_endpoint("example.com", scheme="sips") | ||
assert endpoint.host == "example.com" | ||
assert endpoint.port == 5060 | ||
assert endpoint.description is None | ||
assert endpoint.username is None | ||
assert endpoint.uri == "sips:example.com" |
Oops, something went wrong.