diff --git a/.gitignore b/.gitignore index 4398532..dd2703b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ tmp/ htmlcov .projectile +.env .venv/ venv/ .mypy_cache/ diff --git a/README.md b/README.md index e38263f..bf2d7a2 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,19 @@ # VoIP Utils Voice over IP utilities for the [voip integration](https://www.home-assistant.io/integrations/voip/). + +## Test outgoing call +Install dependencies from requirements_dev.txt + +Set environment variables for source and destination endpoints in .env file + CALL_SRC_USER = "homeassistant" + CALL_SRC_IP = "192.168.1.1" + CALL_SRC_PORT = 5060 + CALL_VIA_IP = "192.168.1.1" + CALL_DEST_IP = "192.168.1.2" + CALL_DEST_PORT = 5060 + CALL_DEST_USER = "phone" + +Run script +python call_example.py + diff --git a/call_example.py b/call_example.py new file mode 100644 index 0000000..4cf570a --- /dev/null +++ b/call_example.py @@ -0,0 +1,174 @@ +import asyncio +import logging +import os +import socket +from functools import partial +from pathlib import Path +from typing import Any, Callable, Optional, Set + +from dotenv import load_dotenv + +from voip_utils.call_phone import VoipCallDatagramProtocol +from voip_utils.sip import ( + CallInfo, + CallPhoneDatagramProtocol, + SdpInfo, + SipEndpoint, + get_sip_endpoint, +) +from voip_utils.voip import RtcpDatagramProtocol, RtcpState, RtpDatagramProtocol + +_LOGGER = logging.getLogger(__name__) + +load_dotenv() + + +def get_env_int(env_var: str, default_val: int) -> int: + value = os.getenv(env_var) + if value is None: + return default_val + try: + return int(value) + except ValueError: + return default_val + + +CALL_SRC_USER = os.getenv("CALL_SRC_USER") +CALL_SRC_IP = os.getenv("CALL_SRC_IP", "127.0.0.1") +CALL_SRC_PORT = get_env_int("CALL_SRC_PORT", 5060) +CALL_VIA_IP = os.getenv("CALL_VIA_IP") +CALL_DEST_IP = os.getenv("CALL_DEST_IP", "127.0.0.1") +CALL_DEST_PORT = get_env_int("CALL_DEST_PORT", 5060) +CALL_DEST_USER = os.getenv("CALL_DEST_USER") + + +RATE = 16000 +WIDTH = 2 +CHANNELS = 1 +RTP_AUDIO_SETTINGS = { + "rate": RATE, + "width": WIDTH, + "channels": CHANNELS, + "sleep_ratio": 0.99, +} + + +class PreRecordMessageProtocol(RtpDatagramProtocol): + """Plays a pre-recorded message on a loop.""" + + def __init__( + self, + file_name: str, + opus_payload_type: int, + message_delay: float = 1.0, + loop_delay: float = 2.0, + rtcp_state: RtcpState | None = None, + ) -> None: + """Set up RTP server.""" + super().__init__( + rate=RATE, + width=WIDTH, + channels=CHANNELS, + opus_payload_type=opus_payload_type, + rtcp_state=rtcp_state, + ) + self.loop = asyncio.get_running_loop() + self.file_name = file_name + self.message_delay = message_delay + self.loop_delay = loop_delay + self._audio_task: asyncio.Task | None = None + file_path = Path(__file__).parent / self.file_name + self._audio_bytes: bytes = file_path.read_bytes() + _LOGGER.debug("Created PreRecordMessageProtocol") + + def on_chunk(self, audio_bytes: bytes) -> None: + """Handle raw audio chunk.""" + _LOGGER.debug("on_chunk") + if self.transport is None: + return + + if self._audio_task is None: + self._audio_task = self.loop.create_task( + self._play_message(), + name="voip_not_connected", + ) + + async def _play_message(self) -> None: + _LOGGER.debug("_play_message") + self.send_audio( + self._audio_bytes, + self.rate, + self.width, + self.channels, + self.addr, + silence_before=self.message_delay, + ) + + await asyncio.sleep(self.loop_delay) + + # Allow message to play again - Only play once for testing + # self._audio_task = None + + +async def main() -> None: + logging.basicConfig(level=logging.DEBUG) + + loop = asyncio.get_event_loop() + source = get_sip_endpoint( + host=CALL_SRC_IP, port=CALL_SRC_PORT, username=CALL_SRC_USER, description=None + ) + destination = get_sip_endpoint( + host=CALL_DEST_IP, + port=CALL_DEST_PORT, + username=CALL_DEST_USER, + description=None, + ) + + # Find free RTP/RTCP ports + rtp_port = 0 + + while True: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setblocking(False) + + # Bind to a random UDP port + sock.bind(("", 0)) + _, rtp_port = sock.getsockname() + + # Close socket to free port for re-use + sock.close() + + # Check that the next port up is available for RTCP + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + sock.bind(("", rtp_port + 1)) + + # Will be opened again below + sock.close() + + # Found our ports + break + except OSError: + # RTCP port is taken + pass + + _, protocol = await loop.create_datagram_endpoint( + lambda: VoipCallDatagramProtocol( + None, + source, + destination, + rtp_port, + lambda call_info, rtcp_state: PreRecordMessageProtocol( + "problem.pcm", + call_info.opus_payload_type, + rtcp_state=rtcp_state, + ), + ), + local_addr=(CALL_SRC_IP, CALL_SRC_PORT), + ) + + await protocol.wait_closed() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/problem.pcm b/problem.pcm new file mode 100644 index 0000000..8873766 Binary files /dev/null and b/problem.pcm differ diff --git a/requirements_dev.txt b/requirements_dev.txt index d9522fb..c6eafa1 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -6,3 +6,4 @@ isort==5.12.0 mypy==1.1.1 pylint==3.2.5 pytest==7.2.2 +python-dotenv==1.0.1 diff --git a/tests/test_sip.py b/tests/test_sip.py index 6644065..a804746 100644 --- a/tests/test_sip.py +++ b/tests/test_sip.py @@ -1,35 +1,74 @@ """Test voip_utils SIP functionality.""" -from voip_utils.sip import SipDatagramProtocol +from voip_utils.sip import SipEndpoint, get_sip_endpoint def test_parse_header_for_uri(): - endpoint, name = SipDatagramProtocol._parse_uri_header( - None, '"Test Name" ' - ) - assert name == "Test Name" - assert endpoint == "sip:12345@example.com" + endpoint = SipEndpoint('"Test Name" ') + assert endpoint.description == "Test Name" + assert endpoint.uri == "sip:12345@example.com" + assert endpoint.username == "12345" + assert endpoint.host == "example.com" + assert endpoint.port == 5060 def test_parse_header_for_uri_no_name(): - endpoint, name = SipDatagramProtocol._parse_uri_header( - None, "sip:12345@example.com" - ) - assert name is None - assert endpoint == "sip:12345@example.com" + endpoint = SipEndpoint("sip:12345@example.com") + assert endpoint.description is None + assert endpoint.uri == "sip:12345@example.com" def test_parse_header_for_uri_sips(): - endpoint, name = SipDatagramProtocol._parse_uri_header( - None, '"Test Name" ' - ) - assert name == "Test Name" - assert endpoint == "sips:12345@example.com" + endpoint = SipEndpoint('"Test Name" ') + assert endpoint.description == "Test Name" + assert endpoint.uri == "sips:12345@example.com" def test_parse_header_for_uri_no_space_name(): - endpoint, name = SipDatagramProtocol._parse_uri_header( - None, "Test " - ) - assert name == "Test" - assert endpoint == "sip:12345@example.com" + endpoint = SipEndpoint("Test ") + assert endpoint.description == "Test" + assert endpoint.uri == "sip:12345@example.com" + + +def test_parse_header_for_uri_no_username(): + endpoint = SipEndpoint("Test ") + assert endpoint.description == "Test" + assert endpoint.username is None + assert endpoint.uri == "sip:example.com" + + +def test_get_sip_endpoint(): + endpoint = get_sip_endpoint("example.com") + assert endpoint.host == "example.com" + assert endpoint.port == 5060 + assert endpoint.description is None + assert endpoint.username is None + assert endpoint.uri == "sip:example.com" + + +def test_get_sip_endpoint_with_username(): + endpoint = get_sip_endpoint("example.com", username="test") + assert endpoint.host == "example.com" + assert endpoint.port == 5060 + assert endpoint.description is None + assert endpoint.username == "test" + assert endpoint.uri == "sip:test@example.com" + + +def test_get_sip_endpoint_with_description(): + endpoint = get_sip_endpoint("example.com", description="Test Endpoint") + assert endpoint.host == "example.com" + assert endpoint.port == 5060 + assert endpoint.description == "Test Endpoint" + assert endpoint.username is None + assert endpoint.uri == "sip:example.com" + assert endpoint.sip_header == '"Test Endpoint" ' + + +def test_get_sip_endpoint_with_scheme(): + endpoint = get_sip_endpoint("example.com", scheme="sips") + assert endpoint.host == "example.com" + assert endpoint.port == 5060 + assert endpoint.description is None + assert endpoint.username is None + assert endpoint.uri == "sips:example.com" diff --git a/voip_utils/call_phone.py b/voip_utils/call_phone.py new file mode 100644 index 0000000..88fbc8e --- /dev/null +++ b/voip_utils/call_phone.py @@ -0,0 +1,101 @@ +import asyncio +import logging +from asyncio.transports import DatagramTransport +from functools import partial +from typing import Any, Callable, Optional, Set + +from .sip import CallInfo, CallPhoneDatagramProtocol, SdpInfo, SipEndpoint +from .voip import RtcpDatagramProtocol, RtcpState + +_LOGGER = logging.getLogger(__name__) + +RATE = 16000 +WIDTH = 2 +CHANNELS = 1 +RTP_AUDIO_SETTINGS = { + "rate": RATE, + "width": WIDTH, + "channels": CHANNELS, + "sleep_ratio": 0.99, +} + +CallProtocolFactory = Callable[[CallInfo, RtcpState], asyncio.DatagramProtocol] + + +class VoipCallDatagramProtocol(CallPhoneDatagramProtocol): + """UDP server for Voice over IP (VoIP).""" + + def __init__( + self, + sdp_info: SdpInfo | None, + source_endpoint: SipEndpoint, + dest_endpoint: SipEndpoint, + rtp_port: int, + call_protocol_factory: CallProtocolFactory, + ) -> None: + """Set up VoIP call handler.""" + super().__init__(sdp_info, source_endpoint, dest_endpoint, rtp_port) + self.call_protocol_factory = call_protocol_factory + self._tasks: Set[asyncio.Future[Any]] = set() + self._rtp_transport: Optional[DatagramTransport] = None + self._rtpc_transport: Optional[DatagramTransport] = None + + def on_call(self, call_info: CallInfo): + """Answer incoming calls and start RTP server on specified port.""" + + rtp_ip = self._source_endpoint.host + + _LOGGER.debug( + "Starting RTP server on ip=%s, rtp_port=%s, rtcp_port=%s", + rtp_ip, + self._rtp_port, + self._rtp_port + 1, + ) + + # Handle RTP packets in RTP server + rtp_task = asyncio.create_task( + self._create_rtp_server( + self.call_protocol_factory, call_info, rtp_ip, self._rtp_port + ) + ) + self._tasks.add(rtp_task) + rtp_task.add_done_callback(self._tasks.remove) + + _LOGGER.debug("RTP server started") + + def call_cleanup(self): + _LOGGER.debug("Closing RTP/C servers for end of call") + if self._rtp_transport is not None: + self._rtp_transport.close() + self._rtp_transport = None + if self._rtpc_transport is not None: + self._rtpc_transport.close() + self._rtpc_transport = None + + def end_call(self, task): + """Callback for hanging up when call is ended.""" + self.hang_up() + + async def _create_rtp_server( + self, + protocol_factory: CallProtocolFactory, + call_info: CallInfo, + rtp_ip: str, + rtp_port: int, + ): + # Shared state between RTP/RTCP servers + rtcp_state = RtcpState() + + loop = asyncio.get_running_loop() + + # RTCP server + self._rtpc_transport, _ = await loop.create_datagram_endpoint( + lambda: RtcpDatagramProtocol(rtcp_state), + (rtp_ip, rtp_port + 1), + ) + + # RTP server + self._rtp_transport, _ = await loop.create_datagram_endpoint( + partial(protocol_factory, call_info, rtcp_state), + (rtp_ip, rtp_port), + ) diff --git a/voip_utils/problem.pcm b/voip_utils/problem.pcm new file mode 100644 index 0000000..8873766 Binary files /dev/null and b/voip_utils/problem.pcm differ diff --git a/voip_utils/sip.py b/voip_utils/sip.py index 7d63134..df32b34 100644 --- a/voip_utils/sip.py +++ b/voip_utils/sip.py @@ -5,8 +5,9 @@ import asyncio import logging import re +import time from abc import ABC, abstractmethod -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Dict, Optional, Tuple from .const import OPUS_PAYLOAD_TYPE @@ -29,15 +30,52 @@ class SdpInfo: version: str +@dataclass +class SipEndpoint: + """Information about a SIP endpoint.""" + + sip_header: str + uri: str = field(init=False) + scheme: str = field(init=False) + host: str = field(init=False) + port: int = field(init=False) + username: str | None = field(init=False) + description: str | None = field(init=False) + + def __post_init__(self): + header_pattern = re.compile( + r'\s*((?P\b\w+\b|"[^"]+")\s+)?sips?:[^>]+)>?.*' + ) + header_match = header_pattern.match(self.sip_header) + if header_match is not None: + description_token = header_match.group("description") + if description_token is not None: + self.description = description_token.strip('"') + else: + self.description = None + self.uri = header_match.group("uri") + uri_pattern = re.compile( + r"(?Psips?):(?:(?P[^@]+)@)?(?P[^:;?]+)(?::(?P\d+))?" + ) + uri_match = uri_pattern.match(self.uri) + if uri_match is None: + raise ValueError("Invalid SIP uri") + self.scheme = uri_match.group("scheme") + self.username = uri_match.group("user") + self.host = uri_match.group("host") + self.port = ( + int(uri_match.group("port")) if uri_match.group("port") else SIP_PORT + ) + else: + raise ValueError("Invalid SIP header") + + @dataclass class CallInfo: """Information gathered from an INVITE message.""" - caller_ip: str - caller_sip_port: int + caller_endpoint: SipEndpoint caller_rtp_port: int - caller_uri: str | None - caller_name: str | None server_ip: str headers: dict[str, str] opus_payload_type: int = OPUS_PAYLOAD_TYPE @@ -48,6 +86,71 @@ def caller_rtcp_port(self) -> int: return self.caller_rtp_port + 1 +@dataclass +class RtpInfo: + """Information about the RTP transport used for the call audio.""" + + rtp_ip: str | None + rtp_port: int | None + payload_type: int | None + + +def get_sip_endpoint( + host: str, + port: Optional[int] = None, + scheme: Optional[str] = "sip", + username: Optional[str] = None, + description: Optional[str] = None, +) -> SipEndpoint: + uri = f"{scheme}:" + if username: + uri += f"{username}@" + uri += host + if port: + uri += f":{port}" + if description: + uri = f'"{description}" <{uri}>' + return SipEndpoint(uri) + + +def get_rtp_info(body: str) -> RtpInfo: + body_lines = body.splitlines() + rtp_ip = None + rtp_port = None + opus_payload_type = None + opus_payload_types_detected = [] + for line in body_lines: + line = line.strip() + if not line: + continue + + key, _, value = line.partition("=") + if key == "m": + parts = value.split() + if parts[0] == "audio": + rtp_port = int(parts[1]) + elif key == "c": + parts = value.split() + if len(parts) > 2: + rtp_ip = parts[2] + elif key == "a" and value.startswith("rtpmap:"): + # a=rtpmap:123 opus/48000/2 + codec_str = value.split(":", maxsplit=1)[1] + codec_parts = codec_str.split() + if (len(codec_parts) > 1) and (codec_parts[1].lower().startswith("opus")): + opus_payload_types_detected.append(int(codec_parts[0])) + _LOGGER.debug("Detected OPUS payload type as %s", opus_payload_type) + + if len(opus_payload_types_detected) > 0: + opus_payload_type = opus_payload_types_detected[0] + _LOGGER.debug("Using first detected payload type: %s", opus_payload_type) + else: + opus_payload_type = OPUS_PAYLOAD_TYPE + _LOGGER.debug("Using default payload type: %s", opus_payload_type) + + return RtpInfo(rtp_ip=rtp_ip, rtp_port=rtp_port, payload_type=opus_payload_type) + + class SipDatagramProtocol(asyncio.DatagramProtocol, ABC): """UDP server for the Session Initiation Protocol (SIP).""" @@ -64,7 +167,7 @@ def datagram_received(self, data: bytes, addr): """Handle INVITE SIP messages.""" try: caller_ip, caller_sip_port = addr - message = data.decode() + message = data.decode("utf-8") method, ruri, headers, body = self._parse_sip(message) _LOGGER.debug( "Received datagram method=%s, ruri=%s, headers=%s, body=%s", @@ -84,26 +187,18 @@ def datagram_received(self, data: bytes, addr): if not ruri: raise ValueError("Empty receiver URI") - caller_uri = None - caller_name = None + caller_endpoint = None # The From header should give us the URI used for sending SIP messages to the device if headers.get("from") is not None: - caller_uri, caller_name = self._parse_uri_header(headers.get("from")) + caller_endpoint = SipEndpoint(headers.get("from", "")) # We can try using the Contact header as a fallback elif headers.get("contact") is not None: - caller_uri, caller_name = self._parse_uri_header(headers.get("contact")) + caller_endpoint = SipEndpoint(headers.get("contact", "")) # If all else fails try to generate a URI based on the IP and port from the address the message came from else: - caller_uri = "sip:" + caller_ip + ":" + caller_sip_port - caller_name = "Unknown" + caller_endpoint = get_sip_endpoint(caller_ip, port=caller_sip_port) - _LOGGER.debug( - "Incoming call from ip=%s, port=%s, uri=%s, name=%s", - caller_ip, - caller_sip_port, - caller_uri, - caller_name, - ) + _LOGGER.debug("Incoming call from endpoint=%s", caller_endpoint) # Extract caller's RTP port from SDP. # See: https://datatracker.ietf.org/doc/html/rfc2327 @@ -157,11 +252,8 @@ def datagram_received(self, data: bytes, addr): self.on_call( CallInfo( - caller_ip=caller_ip, - caller_sip_port=caller_sip_port, + caller_endpoint=caller_endpoint, caller_rtp_port=caller_rtp_port, - caller_uri=caller_uri, - caller_name=caller_name, server_ip=server_ip, headers=headers, opus_payload_type=opus_payload_type, @@ -232,28 +324,6 @@ def answer( server_rtp_port, ) - def _parse_uri_header( - self, header: str | None - ) -> Tuple[Optional[str], Optional[str]]: - """Parse SIP Contact/From Header and return endpoint URI and name.""" - uri: Optional[str] = None - name: Optional[str] = None - - if header is None: - return uri, name - - header_pattern = re.compile( - r'\s*((?P\b\w+\b|"[^"]+")\s+)?sips?:[^>]+)>?.*' - ) - header_match = header_pattern.match(header) - if header_match is not None: - name_token = header_match.group("name") - if name_token is not None: - name = name_token.strip('"') - uri = header_match.group("uri") - - return uri, name - def _parse_sip( self, message: str ) -> Tuple[Optional[str], Optional[str], Dict[str, str], str]: @@ -283,3 +353,286 @@ def _parse_sip( body = message[offset:] return method, ruri, headers, body + + +class CallPhoneDatagramProtocol(asyncio.DatagramProtocol, ABC): + def __init__( + self, + sdp_info: SdpInfo | None, + source: SipEndpoint, + dest: SipEndpoint, + rtp_port: int, + ) -> None: + self.sdp_info = sdp_info + self.transport = None + self._closed_event = asyncio.Event() + self._loop = asyncio.get_running_loop() + self._session_id = str(time.monotonic_ns()) + self._session_version = self._session_id + self._call_id = self._session_id + self._source_endpoint = source + self._dest_endpoint = dest + self._rtp_port = rtp_port + + def connection_made(self, transport): + self.transport = transport + + sdp_lines = [ + "v=0", + f"o={self._source_endpoint.username} {self._session_id} {self._session_version} IN IP4 {self._source_endpoint.host}", + "s=Talk", + f"c=IN IP4 {self._source_endpoint.host}", + "t=0 0", + f"m=audio {self._rtp_port} RTP/AVP 123 96 101 103 104", + "a=sendrecv", + "a=rtpmap:96 opus/48000/2", + "a=fmtp:96 useinbandfec=0", + "a=rtpmap:123 opus/48000/2", + "a=fmtp:123 maxplaybackrate=16000", + "a=rtpmap:101 telephone-event/48000", + "a=rtpmap:103 telephone-event/16000", + "a=rtpmap:104 telephone-event/8000", + "a=ptime:20", + "", + ] + sdp_text = _CRLF.join(sdp_lines) + sdp_bytes = sdp_text.encode("utf-8") + + invite_lines = [ + f"INVITE {self._dest_endpoint.uri} SIP/2.0", + f"Via: SIP/2.0/UDP {self._source_endpoint.host}:{self._source_endpoint.port}", + f"From: {self._source_endpoint.sip_header}", + f"Contact: {self._source_endpoint.sip_header}", + f"To: {self._dest_endpoint.sip_header}", + f"Call-ID: {self._call_id}", + "CSeq: 50 INVITE", + "User-Agent: test-agent 1.0", + "Allow: INVITE, ACK, OPTIONS, CANCEL, BYE, SUBSCRIBE, NOTIFY, INFO, REFER, UPDATE", + "Accept: application/sdp, application/dtmf-relay", + "Content-Type: application/sdp", + f"Content-Length: {len(sdp_bytes)}", + "", + ] + invite_text = _CRLF.join(invite_lines) + _CRLF + invite_bytes = invite_text.encode("utf-8") + + _LOGGER.debug(invite_bytes + sdp_bytes) + + self.transport.sendto( + invite_bytes + sdp_bytes, + (self._dest_endpoint.host, self._dest_endpoint.port), + ) + + def datagram_received(self, data: bytes, addr): + response_text = data.decode("utf-8") + response_lines = response_text.splitlines() + _LOGGER.debug(response_lines) + is_ok = False + + for i, line in enumerate(response_lines): + line = line.strip() + if not line: + break + if i > 0: + continue + _version, code, response_type = line.split(maxsplit=2) + _LOGGER.debug( + "Version=%s, Code=%s, response_type=%s", + _version, + code, + response_type, + ) + if (code == "200") and (response_type == "OK"): + is_ok = True + elif code == "401": + _LOGGER.debug( + "Got 401 Unauthorized response, should attempt authentication here..." + ) + # register_lines = [ + # f"REGISTER {self._dest_endpoint.uri} SIP/2.0", + # f"Via: SIP/2.0/UDP {self._source_endpoint.host}:{self._source_endpoint.port}", + # f"From: {self._source_endpoint.sip_header}", + # f"Contact: {self._source_endpoint.sip_header}", + # f"To: {self._dest_endpoint.sip_header}", + # f"Call-ID: {self._call_id}", + # "CSeq: 51 REGISTER", + # "Authorization: ", + # "User-Agent: test-agent 1.0", + # "Allow: INVITE, ACK, OPTIONS, CANCEL, BYE, SUBSCRIBE, NOTIFY, INFO, REFER, UPDATE", + # "", + # ] + elif _version == "BYE": + _LOGGER.debug("Received BYE message: %s", line) + if self.transport is None: + _LOGGER.debug("Skipping message: %s", line) + continue + + # Acknowledge the BYE message, otherwise the phone will keep sending it + ( + protocol, + code, + reason, + headers, + body, + ) = self._parse_sip_reply(response_text) + _LOGGER.debug( + "Parsed response protocol=%s code=%s reason=%s headers=[%s] body=[%s]", + protocol, + code, + reason, + headers, + body, + ) + rtp_info = get_rtp_info(body) + remote_rtp_port = rtp_info.rtp_port + opus_payload_type = rtp_info.payload_type + via_header = headers["via"] + from_header = headers["from"] + to_header = headers["to"] + callid_header = headers["call-id"] + cseq_header = headers["cseq"] + ok_lines = [ + "SIP/2.0 200 OK", + f"Via: {via_header}", + f"From: {from_header}", + f"To: {to_header}", + f"Call-ID: {callid_header}", + f"CSeq: {cseq_header}", + "User-Agent: test-agent 1.0", + "Content-Length: 0", + ] + ok_text = _CRLF.join(ok_lines) + _CRLF + ok_bytes = ok_text.encode("utf-8") + # We should probably tell the associated RTP server to shutdown at this point, assuming we aren't reusing it for other calls + _LOGGER.debug("Sending OK for BYE message: %s", ok_text) + self.transport.sendto( + ok_bytes, + (self._dest_endpoint.host, self._dest_endpoint.port), + ) + + self.transport.close() + self.transport = None + + if not is_ok: + _LOGGER.debug("Received non-OK response [%s]", response_text) + return + + _LOGGER.debug("Got OK message") + if self.transport is None: + _LOGGER.debug("No transport for exchanging SIP message") + return + + protocol, code, reason, headers, body = self._parse_sip_reply(response_text) + _LOGGER.debug( + "Parsed response protocol=%s code=%s reason=%s headers=[%s] body=[%s]", + protocol, + code, + reason, + headers, + body, + ) + rtp_info = get_rtp_info(body) + remote_rtp_port = rtp_info.rtp_port + opus_payload_type = rtp_info.payload_type + to_header = headers["to"] + ack_lines = [ + f"ACK {self._dest_endpoint.uri} SIP/2.0", + f"Via: SIP/2.0/UDP {self._source_endpoint.host}:{self._source_endpoint.port}", + f"From: {self._source_endpoint.sip_header}", + f"To: {to_header}", + f"Call-ID: {self._call_id}", + "CSeq: 50 ACK", + "User-Agent: test-agent 1.0", + "Content-Length: 0", + ] + ack_text = _CRLF.join(ack_lines) + _CRLF + ack_bytes = ack_text.encode("utf-8") + self.transport.sendto( + ack_bytes, (self._dest_endpoint.host, self._dest_endpoint.port) + ) + + # The call been answered, proceed with desired action here + self.on_call( + CallInfo( + caller_endpoint=self._dest_endpoint, + caller_rtp_port=remote_rtp_port, + server_ip=self._dest_endpoint.host, + headers=headers, + opus_payload_type=opus_payload_type, # Should probably update this to eventually support more codecs + ) + ) + + @abstractmethod + def on_call(self, call_info: CallInfo): + """Handle outgoing calls.""" + + @abstractmethod + def call_cleanup(self): + """Handle cleanup after ending call.""" + + def hang_up(self): + """Hang up the call when finished""" + bye_lines = [ + f"BYE {self._dest_endpoint.uri} SIP/2.0", + f"Via: SIP/2.0/UDP {self._source_endpoint.host}:{self._source_endpoint.port}", + f"From: {self._source_endpoint.sip_header}", + f"To: {self._dest_endpoint.sip_header}", + f"Call-ID: {self._call_id}", + "CSeq: 51 BYE", + "User-Agent: test-agent 1.0", + "Content-Length: 0", + "", + ] + _LOGGER.debug("Hanging up...") + bye_text = _CRLF.join(bye_lines) + _CRLF + bye_bytes = bye_text.encode("utf-8") + self.transport.sendto( + bye_bytes, (self._dest_endpoint.host, self._dest_endpoint.port) + ) + + self.call_cleanup() + + self.transport.close() + self.transport = None + + def connection_lost(self, exc): + """Signal wait_closed when transport is completely closed.""" + _LOGGER.debug("Connection lost") + self._closed_event.set() + self.call_cleanup() + + async def wait_closed(self) -> None: + """Wait for connection_lost to be called.""" + await self._closed_event.wait() + + def _parse_sip_reply( + self, message: str + ) -> Tuple[Optional[str], Optional[str], Optional[str], Dict[str, str], str]: + """Parse SIP message and return method, headers, and body.""" + lines = message.splitlines() + + protocol: Optional[str] = None + code: Optional[str] = None + reason: Optional[str] = None + headers: dict[str, str] = {} + offset: int = 0 + + # See: https://datatracker.ietf.org/doc/html/rfc3261 + for i, line in enumerate(lines): + if line: + offset += len(line) + len(_CRLF) + + if i == 0: + line_parts = line.split() + protocol = line_parts[0] + code = line_parts[1] + reason = line_parts[2] + elif not line: + break + else: + key, value = line.split(":", maxsplit=1) + headers[key.lower()] = value.strip() + + body = message[offset:] + + return protocol, code, reason, headers, body diff --git a/voip_utils/voip.py b/voip_utils/voip.py index 008d608..246909c 100644 --- a/voip_utils/voip.py +++ b/voip_utils/voip.py @@ -208,10 +208,12 @@ def send_audio( ) -> None: """Send audio from WAV file in chunks over RTP.""" if not self._is_connected: + _LOGGER.debug("Not connected, can't send audio") return addr = addr or self.addr if addr is None: + _LOGGER.debug("No destination address, can't send audio") raise ValueError("Destination address not set") bytes_per_sample = width * channels @@ -222,6 +224,7 @@ def send_audio( samples_left = len(audio_bytes) // bytes_per_sample rtp_packets: list[bytes] = [] while samples_left > 0: + _LOGGER.debug("Preparing audio chunk to send") bytes_offset = sample_offset * bytes_per_sample chunk = audio_bytes[bytes_offset : bytes_offset + bytes_per_frame] samples_in_chunk = len(chunk) // bytes_per_sample @@ -239,6 +242,7 @@ def send_audio( sample_offset += samples_in_chunk # Pause before sending to allow time for user to pick up phone. + _LOGGER.debug("Pause before sending") time.sleep(silence_before) # Send RTP in a steady stream, delaying between each packet to simulate real-time audio