diff --git a/examples/foundational/45-simple-jambonz-app.py b/examples/foundational/45-simple-jambonz-app.py new file mode 100644 index 0000000000..8333e97aa1 --- /dev/null +++ b/examples/foundational/45-simple-jambonz-app.py @@ -0,0 +1,198 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import os +import json + +from aiohttp import web +from loguru import logger +from pipecat.audio.vad.vad_analyzer import VADParams + +from pipecat.audio.vad.silero import SileroVADAnalyzer + +from pipecat.transports.jambonz import JambonzTransport, JambonzTransportParams +from pipecat.serializers.jambonz import JambonzFrameSerializer +from pipecat.frames.frames import LLMMessagesAppendFrame +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.services.elevenlabs.tts import ElevenLabsTTSService +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.processors.aggregators.llm_response import LLMUserAggregatorParams +from pipecat.services.gladia.stt import GladiaSTTService +from pipecat.services.gladia.config import ( + GladiaInputParams, + LanguageConfig, + MessagesConfig, +) +from pipecat.transcriptions.language import Language + +routes = web.RouteTableDef() + + +async def run_bot(transport): + call_info = transport.get_call_info() + + stt = GladiaSTTService( + api_key=os.getenv("GLADIA_API_KEY"), + confidence=0.1, + sample_rate=call_info[ + "sampleRate" + ], # You can change this but don't forget to update the stt_sample_rate in the JambonzFrameSerializer + params=GladiaInputParams( + language_config=LanguageConfig( + languages=[Language.TL, Language.EN], code_switching=True + ), + messages_config=MessagesConfig(receive_partial_transcripts=True), + ), + ) + + # In this scenario, I'm using BaseTen's API for the LLM. For speed and much cheaper than OpenAI. + llm = OpenAILLMService( + model="moonshotai/Kimi-K2-Instruct", + api_key=os.getenv("BASETEN_API_KEY"), + params=OpenAILLMService.InputParams( + temperature=0, + ), + base_url="https://inference.baseten.co/v1", + ) + + tts = ElevenLabsTTSService( + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + model="eleven_flash_v2_5", + params=ElevenLabsTTSService.InputParams( + stability=0.7, + similarity_boost=0.8, + style=0.5, + use_speaker_boost=True, + speed=1.1, + ), + sample_rate=8000, + ) + + context = OpenAILLMContext( + [{"role": "system", "content": "You are a helpful assistant."}] + ) + + context_aggregator = llm.create_context_aggregator( + context, + user_params=LLMUserAggregatorParams(aggregation_timeout=0.5), + ) + + pipeline = Pipeline( + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + allow_interruption=True, + ), + enable_turn_tracking=True, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info("Pipecat Client connected") + await task.queue_frame( + LLMMessagesAppendFrame( + messages=[ + { + "role": "system", + "content": f"Say hello.", + } + ], + run_llm=True, + ) + ) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info("Pipecat Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) + + +""" +Connect this endpoint to the LISTEN verb in Jambonz and set the listen url to this endpoint. + +Link to Jambonz's documentation: https://jambonz.readthedocs.io/en/latest/voice/listen.html +""" + + +@routes.get("/stream") +async def stream(request): + ws = web.WebSocketResponse( + protocols=("audio.jambonz.org",), + heartbeat=15, # More frequent heartbeats for deployed environments + compress=False, # do not compress audio + max_msg_size=0, # no limit for audio frames + timeout=30, # Add explicit timeout + ) + await ws.prepare(request) + logger.info(f"WebSocket prepared, waiting for initial message...") + # This is the initial metadata from Jambonz + initial_message = await ws.receive() + call_info = json.loads(initial_message.data) + logger.info(f"Received call info from Jambonz Server: {call_info}") + logger.info(f"Sample rate: {call_info.get('sampleRate')}") + + transport = JambonzTransport( + websocket=ws, + params=JambonzTransportParams( + serializer=JambonzFrameSerializer( + # reference: https://docs.jambonz.org/verbs/verbs/listen#streaming + JambonzFrameSerializer.InputParams( + audio_in_sample_rate=call_info[ + "sampleRate" + ], # This is the sample rate Jambonz sends us + stt_sample_rate=call_info[ + "sampleRate" + ], # This is the sample_rate you set in your "stt" + audio_out_sample_rate="8000", # This is the sample rate you set in "sampleRate" under "bidirectionalAudio" + ) + ), + audio_in_enabled=True, + audio_out_enabled=True, + sample_rate=call_info["sampleRate"], + audio_in_sample_rate=call_info["sampleRate"], + audio_out_sample_rate=call_info["sampleRate"], + audio_in_channels=1, + session_timeout=300, + vad_analyzer=SileroVADAnalyzer(params=VADParams()), + ), + ) + + transport.set_call_info(call_info) + logger.info(f"Starting bot with transport...") + + await run_bot(transport) + + return ws + + +app = web.Application() +app.add_routes(routes) + +if __name__ == "__main__": + web.run_app(app, port=3002) diff --git a/src/pipecat/serializers/jambonz.py b/src/pipecat/serializers/jambonz.py new file mode 100644 index 0000000000..77f841e0ab --- /dev/null +++ b/src/pipecat/serializers/jambonz.py @@ -0,0 +1,163 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Jambonz WebSocket protocol serializer for Pipecat.""" + +import json +from typing import Optional +from aiohttp import web, WSMsgType +from pydantic import BaseModel +from pipecat.audio.utils import create_stream_resampler +from pipecat.frames.frames import ( + AudioRawFrame, + CancelFrame, + EndFrame, + Frame, + InputAudioRawFrame, + InputDTMFFrame, + KeypadEntry, + StartFrame, + StartInterruptionFrame, + TransportMessageFrame, + TransportMessageUrgentFrame, +) +from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType + + +class JambonzFrameSerializer(FrameSerializer): + """Serializer for Jambonz WebSocket protocol. + + This serializer handles converting between Pipecat frames and Jambonz's WebSocket + protocol. It supports audio conversion, DTMF events, and automatic + call termination. It only supports OpenAI and Elevenlabs TTS at the moment. + """ + + class InputParams(BaseModel): + """Configuration parameters for JambonzFrameSerializer. + + Parameters: + audio_in_sample_rate: Optional override for the sample rate of the audio we get from Jambonz. + audio_out_sample_rate: Optional override for the sample rate of the audio we send to Jambonz. + stt_sample_rate: Optional override for the sample rate of the audio we send to the STT service. + auto_hang_up: Whether to automatically terminate call on EndFrame. + """ + + audio_in_sample_rate: Optional[int] = None + audio_out_sample_rate: Optional[int] = None + stt_sample_rate: Optional[int] = None + auto_hang_up: bool = True + + def __init__( + self, + params: InputParams, + ): + """Initialize the JambonzFrameSerializer. + + Args: + params: Configuration parameters. + """ + self._params = params + self._input_resampler = create_stream_resampler() + self._output_resampler = create_stream_resampler() + self._hangup_attempted = False + + @property + def type(self) -> FrameSerializerType: + """Gets the serializer type. + + Returns: + The serializer type, either TEXT or BINARY. + """ + return FrameSerializerType.BINARY + + async def setup(self, frame: StartFrame): + """Sets up the serializer with pipeline configuration. + + Args: + frame: The StartFrame containing pipeline configuration. + """ + pass + + async def serialize( + self, frame: Frame, sample_rate: int = 16000 + ) -> str | bytes | None: + """Serializes a Pipecat frame to Jambonz WebSocket format. + + Handles conversion of various frame types to Jambonz WebSocket messages. + For EndFrames, initiates call termination if auto_hang_up is enabled. + + Args: + frame: The Pipecat frame to serialize. + + Returns: + Serialized data as string or bytes, or None if the frame isn't handled. + """ + if ( + self._params.auto_hang_up + and not self._hangup_attempted + and isinstance(frame, (EndFrame, CancelFrame)) + ): + return {"type": "disconnect"} + # stop the previous playAudio from running + elif isinstance(frame, StartInterruptionFrame): + return {"type": "killAudio"} + elif isinstance(frame, AudioRawFrame): + data = frame.audio + serialized_data = await self._output_resampler.resample( + data, frame.sample_rate, self._params.audio_out_sample_rate + ) + if serialized_data is None or len(serialized_data) == 0: + return None + return serialized_data + elif isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)): + return json.dumps(frame.message) + + # Return None for unhandled frames + return None + + async def deserialize(self, msg: web.WebSocketResponse) -> Frame | None: + """Deserializes Jambonz WebSocket data to Pipecat frames. + + Handles conversion of Jambonz media events to appropriate Pipecat frames. + Audio comes as binary frames, DTMF and other events come as JSON text frames. + + Args: + data: The raw WebSocket data from Jambonz. + + Returns: + A Pipecat frame corresponding to the Jambonz event, or None if unhandled. + """ + # Handle binary audio frames + if msg.type == WSMsgType.BINARY: + # Audio comes as raw binary PCM data + # Input: Resample from Jambonz sample rate to pipeline input rate + deserialized_data = await self._input_resampler.resample( + msg.data, + self._params.audio_in_sample_rate, + self._params.stt_sample_rate or self._params.audio_in_sample_rate, + ) + if deserialized_data is None or len(deserialized_data) == 0: + # Ignoring in case we don't have audio + return None + + audio_frame = InputAudioRawFrame( + audio=deserialized_data, + num_channels=1, + sample_rate=self._params.stt_sample_rate + or self._params.audio_in_sample_rate, + ) + return audio_frame + + # Handle JSON text frames (DTMF, commands, etc.) + elif msg.type == WSMsgType.TEXT: + message = json.loads(msg.data) + if message["event"] == "dtmf": + digit = message["dtmf"] + return InputDTMFFrame(KeypadEntry(digit)) + # Handle other JSON events if needed (initial metadata, commands, etc.) + else: + return None + return None diff --git a/src/pipecat/transports/jambonz/__init__.py b/src/pipecat/transports/jambonz/__init__.py new file mode 100644 index 0000000000..58f472384d --- /dev/null +++ b/src/pipecat/transports/jambonz/__init__.py @@ -0,0 +1,3 @@ +from .transport import JambonzTransport, JambonzTransportParams + +__all__ = ["JambonzTransport", "JambonzTransportParams"] diff --git a/src/pipecat/transports/jambonz/transport.py b/src/pipecat/transports/jambonz/transport.py new file mode 100644 index 0000000000..9a1d36bee3 --- /dev/null +++ b/src/pipecat/transports/jambonz/transport.py @@ -0,0 +1,468 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Jambonz transport implementation for Pipecat. + +This module provides comprehensive Jambonz integration for as transport. +""" + +import asyncio +import time + +from aiohttp import web +from typing import Optional, Awaitable, Callable, AsyncIterator +from pydantic import BaseModel +from loguru import logger + +from pipecat.frames.frames import ( + CancelFrame, + EndFrame, + Frame, + InputAudioRawFrame, + OutputAudioRawFrame, + StartFrame, + StartInterruptionFrame, + TransportMessageFrame, + TransportMessageUrgentFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.serializers.base_serializer import FrameSerializer +from pipecat.transports.base_input import BaseInputTransport +from pipecat.transports.base_output import BaseOutputTransport +from pipecat.transports.base_transport import BaseTransport, TransportParams + + +class JambonzTransportParams(TransportParams): + """Configuration parameters for Jambonz transport. + + Parameters: + serializer: Frame serializer for encoding/decoding messages. + session_timeout: Session timeout in seconds, None for no timeout. + """ + + serializer: Optional[FrameSerializer] = None + session_timeout: Optional[int] = None + + +class JambonzTransportCallbacks(BaseModel): + """Callback functions for Jambonz events. + + Parameters: + on_client_connected: Called when a client connects to the WebSocket. + on_client_disconnected: Called when a client disconnects from the WebSocket. + on_session_timeout: Called when a session timeout occurs. + """ + + on_client_connected: Callable[[web.WebSocketResponse], Awaitable[None]] + on_client_disconnected: Callable[[web.WebSocketResponse], Awaitable[None]] + on_session_timeout: Callable[[web.WebSocketResponse], Awaitable[None]] + + +class JambonzTransportClient: + """Jambonz client wrapper for handling connections and message passing. + + Manages Jambonz state, message sending/receiving, and connection lifecycle + with support for both binary and text message types. + """ + + def __init__( + self, + websocket: web.WebSocketResponse, + callbacks: JambonzTransportCallbacks, + params: JambonzTransportParams, + ): + self._websocket = websocket + self._callbacks = callbacks + self._params = params + self._closing = False + + self._disconnected_emitted = False + + async def send(self, data: str | bytes | dict): + """Send data through the WebSocket connection.""" + try: + if self._closing: + return + if isinstance(data, dict): + await self._websocket.send_json(data) + elif isinstance(data, bytes): + await self._websocket.send_bytes(data) + else: + raise ValueError(f"Invalid data type: {type(data)}") + except Exception as e: + logger.error(f"Exception sending data: {e}") + + async def receive(self) -> AsyncIterator[bytes | str]: + """Receive data from the WebSocket connection.""" + async for msg in self._websocket: + yield msg + + async def disconnect(self): + """Disconnect the websocket client.""" + self._closing = True + + await self.trigger_client_disconnected() + + async def trigger_client_disconnected(self): + """Trigger the client disconnected callback.""" + if self._disconnected_emitted: + return + self._disconnected_emitted = True + logger.info(f"Connection with Jambonz disconnected.") + await self._callbacks.on_client_disconnected(self._websocket) + + async def trigger_client_connected(self): + """Trigger the client connected callback.""" + logger.info(f"Connection with Jambonz established.") + await self._callbacks.on_client_connected(self._websocket) + + async def trigger_client_timeout(self): + """Trigger the client timeout callback.""" + logger.warning(f"Connection with Jambonz timed out.") + await self._callbacks.on_session_timeout(self._websocket) + + @property + def is_connected(self) -> bool: + return self._websocket.closed is False + + @property + def is_closing(self) -> bool: + """Check if the WebSocket is in the process of closing. + + Returns: + True if the WebSocket is in closing state. + """ + return self._closing + + +class JambonzInputTransport(BaseInputTransport): + """Input transport for Jambonz WebSocket connections. + + Handles incoming WebSocket messages, deserializes frames, and manages + connection monitoring with optional session timeouts. + """ + + def __init__( + self, + transport: BaseTransport, + client: JambonzTransportClient, + params: JambonzTransportParams, + **kwargs, + ): + super().__init__(params, **kwargs) + self._transport = transport + self._client = client + self._params = params + self._receive_task = None + self._monitor_websocket_task = None + + # Whether we have seen a StartFrame already. + self._initialized = False + + async def start(self, frame: StartFrame): + """Start the input transport and begin message processing. + + Args: + frame: The start frame containing initialization parameters. + """ + await super().start(frame) + + # Propagate resolved input sample rate back to StartFrame + frame.audio_in_sample_rate = self.sample_rate + frame.audio_out_sample_rate = self.sample_rate + + if self._initialized: + return + + self._initialized = True + + if self._params.serializer: + await self._params.serializer.setup(frame) + if not self._monitor_websocket_task and self._params.session_timeout: + self._monitor_websocket_task = self.create_task(self._monitor_websocket()) + await self._client.trigger_client_connected() + if not self._receive_task: + self._receive_task = self.create_task(self._receive_messages()) + await self.set_transport_ready(frame) + + async def _stop_tasks(self): + """Stop all running tasks.""" + if self._monitor_websocket_task: + await self.cancel_task(self._monitor_websocket_task) + self._monitor_websocket_task = None + if self._receive_task: + await self.cancel_task(self._receive_task) + self._receive_task = None + + async def stop(self, frame: EndFrame): + """Stop the input transport and cleanup resources. + + Args: + frame: The end frame signaling transport shutdown. + """ + await super().stop(frame) + await self._stop_tasks() + await self._client.disconnect() + + async def cancel(self, frame: CancelFrame): + """Cancel the input transport and stop all processing. + + Args: + frame: The cancel frame signaling immediate cancellation. + """ + await super().cancel(frame) + await self._stop_tasks() + await self._client.disconnect() + + async def cleanup(self): + """Clean up transport resources.""" + await super().cleanup() + await self._transport.cleanup() + + async def _receive_messages(self): + """Main message receiving loop for WebSocket messages.""" + try: + async for message in self._client.receive(): + if not self._params.serializer: + continue + + frame = await self._params.serializer.deserialize(message) + if not frame: + continue + if isinstance(frame, InputAudioRawFrame): + await self.push_audio_frame(frame) + else: + logger.info(f"[AioHTTP WS] received frame={frame}") + await self.push_frame(frame) + except Exception as e: + logger.error( + f"{self} exception receiving data: {e.__class__.__name__} ({e})" + ) + + await self._client.trigger_client_disconnected() + + async def _monitor_websocket(self): + """Wait for self._params.session_timeout seconds, if the websocket is still open, trigger timeout event.""" + await asyncio.sleep(self._params.session_timeout) + await self._client.trigger_client_timeout() + + +class JambonzOutputTransport(BaseOutputTransport): + """Output transport for Jambonz WebSocket connections. + + Handles outgoing frame serialization, audio streaming with timing simulation, + and WebSocket message transmission with optional WAV header generation. + """ + + def __init__( + self, + transport: BaseTransport, + client: JambonzTransportClient, + params: JambonzTransportParams, + **kwargs, + ): + """Initialize the WebSocket output transport.""" + super().__init__(params, **kwargs) + self._transport = transport + self._client = client + self._params = params + + self._send_interval = 0 + self._next_send_time = 0 + + self._initialized = False + + async def start(self, frame: StartFrame): + """Start the output transport and initialize timing. + + Args: + frame: The start frame containing initialization parameters. + """ + await super().start(frame) + + if self._initialized: + return + + self._initialized = True + + # Propagate resolved output sample rate back to StartFrame so serializers see it + frame.audio_out_sample_rate = self.sample_rate + + if self._params.serializer: + await self._params.serializer.setup(frame) + self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2 + await self.set_transport_ready(frame) + + async def stop(self, frame: EndFrame): + """Stop the output transport and cleanup resources. + + Args: + frame: The end frame signaling transport shutdown. + """ + await super().stop(frame) + await self._write_frame(frame) + await self._client.disconnect() + + async def cancel(self, frame: CancelFrame): + """Cancel the output transport and stop all processing. + + Args: + frame: The cancel frame signaling immediate cancellation. + """ + await super().cancel(frame) + await self._write_frame(frame) + await self._client.disconnect() + + async def cleanup(self): + """Clean up transport resources.""" + await super().cleanup() + await self._transport.cleanup() + + async def process_frame(self, frame: Frame, direction: FrameDirection): + """Process outgoing frames with special handling for interruptions.""" + await super().process_frame(frame, direction) + if isinstance(frame, StartInterruptionFrame): + await self._write_frame(frame) + self._next_send_time = 0 + + async def send_message( + self, frame: TransportMessageFrame | TransportMessageUrgentFrame + ): + """Send a transport message frame. + + Args: + frame: The transport message frame to send. + """ + await self._write_frame(frame) + + async def write_audio_frame(self, frame: OutputAudioRawFrame): + """Write an audio frame to the WebSocket with timing simulation. + + Args: + frame: The output audio frame to write. + """ + if self._client.is_closing or not self._client.is_connected: + return + + frame = OutputAudioRawFrame( + audio=frame.audio, + sample_rate=self.sample_rate, + num_channels=self._params.audio_out_channels, + ) + + await self._write_frame(frame) + + # Simulate audio playback with a sleep. + await self._write_audio_sleep() + + async def _write_frame(self, frame: Frame): + """Serialize and send a frame through the WebSocket.""" + if not self._params.serializer: + return + + try: + payload = await self._params.serializer.serialize(frame) + if payload: + await self._client.send(payload) + except Exception as e: + logger.error(f"{self} exception sending data: {e.__class__.__name__} ({e})") + + async def _write_audio_sleep(self): + """Simulate audio playback timing with appropriate delays.""" + # Simulate a clock. + current_time = time.monotonic() + sleep_duration = max(0, self._next_send_time - current_time) + await asyncio.sleep(sleep_duration) + if sleep_duration == 0: + self._next_send_time = time.monotonic() + self._send_interval + else: + self._next_send_time += self._send_interval + + +class JambonzTransport(BaseTransport): + """Jambonz WebSocket transport for real-time audio/video streaming. + + Provides bidirectional WebSocket communication with frame serialization, + session management, and event handling for client connections and timeouts. + """ + + def __init__( + self, + websocket: web.WebSocketResponse, + params: JambonzTransportParams, + input_name: Optional[str] = None, + output_name: Optional[str] = None, + ): + """Initialize the Jambonz WebSocket transport. + + Args: + websocket: The Jambonz WebSocket connection. + params: Transport configuration parameters. + input_name: Optional name for the input processor. + output_name: Optional name for the output processor. + """ + super().__init__(input_name=input_name, output_name=output_name) + + self._params = params + + self._callbacks = JambonzTransportCallbacks( + on_client_connected=self._on_client_connected, + on_client_disconnected=self._on_client_disconnected, + on_session_timeout=self._on_session_timeout, + ) + + self._client = JambonzTransportClient(websocket, self._callbacks, self._params) + + self._input = JambonzInputTransport( + self, self._client, self._params, name=self._input_name + ) + self._output = JambonzOutputTransport( + self, self._client, self._params, name=self._output_name + ) + + self.call_info = None + + # Register supported handlers. The user will only be able to register + # these handlers. + self._register_event_handler("on_client_connected") + self._register_event_handler("on_client_disconnected") + self._register_event_handler("on_session_timeout") + + def input(self) -> JambonzInputTransport: + """Get the input transport processor. + + Returns: + The WebSocket input transport instance. + """ + return self._input + + def output(self) -> JambonzOutputTransport: + """Get the output transport processor. + + Returns: + The WebSocket output transport instance. + """ + return self._output + + def set_call_info(self, call_info: dict): + """Set the call info for the transport.""" + self.call_info = call_info + + def get_call_info(self) -> dict: + """Get the call info for the transport.""" + return self.call_info + + async def _on_client_connected(self, websocket): + """Handle client connected event.""" + await self._call_event_handler("on_client_connected", websocket) + + async def _on_client_disconnected(self, websocket): + """Handle client disconnected event.""" + await self._call_event_handler("on_client_disconnected", websocket) + + async def _on_session_timeout(self, websocket): + """Handle session timeout event.""" + await self._call_event_handler("on_session_timeout", websocket)