Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions examples/foundational/45-simple-jambonz-app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import os
import json

from aiohttp import web
from loguru import logger
from pipecat.audio.vad.vad_analyzer import VADParams

from pipecat.audio.vad.silero import SileroVADAnalyzer

from pipecat.transports.jambonz import JambonzTransport, JambonzTransportParams
from pipecat.serializers.jambonz import JambonzFrameSerializer
from pipecat.frames.frames import LLMMessagesAppendFrame
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.processors.aggregators.llm_response import LLMUserAggregatorParams
from pipecat.services.gladia.stt import GladiaSTTService
from pipecat.services.gladia.config import (
GladiaInputParams,
LanguageConfig,
MessagesConfig,
)
from pipecat.transcriptions.language import Language

routes = web.RouteTableDef()


async def run_bot(transport):
call_info = transport.get_call_info()

stt = GladiaSTTService(
api_key=os.getenv("GLADIA_API_KEY"),
confidence=0.1,
sample_rate=call_info[
"sampleRate"
], # You can change this but don't forget to update the stt_sample_rate in the JambonzFrameSerializer
params=GladiaInputParams(
language_config=LanguageConfig(
languages=[Language.TL, Language.EN], code_switching=True
),
messages_config=MessagesConfig(receive_partial_transcripts=True),
),
)

# In this scenario, I'm using BaseTen's API for the LLM. For speed and much cheaper than OpenAI.
llm = OpenAILLMService(
model="moonshotai/Kimi-K2-Instruct",
api_key=os.getenv("BASETEN_API_KEY"),
params=OpenAILLMService.InputParams(
temperature=0,
),
base_url="https://inference.baseten.co/v1",
)

tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
model="eleven_flash_v2_5",
params=ElevenLabsTTSService.InputParams(
stability=0.7,
similarity_boost=0.8,
style=0.5,
use_speaker_boost=True,
speed=1.1,
),
sample_rate=8000,
)

context = OpenAILLMContext(
[{"role": "system", "content": "You are a helpful assistant."}]
)

context_aggregator = llm.create_context_aggregator(
context,
user_params=LLMUserAggregatorParams(aggregation_timeout=0.5),
)

pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)

task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
allow_interruption=True,
),
enable_turn_tracking=True,
)

@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Pipecat Client connected")
await task.queue_frame(
LLMMessagesAppendFrame(
messages=[
{
"role": "system",
"content": f"Say hello.",
}
],
run_llm=True,
)
)

@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Pipecat Client disconnected")
await task.cancel()

runner = PipelineRunner(handle_sigint=False)

await runner.run(task)


"""
Connect this endpoint to the LISTEN verb in Jambonz and set the listen url to this endpoint.

Link to Jambonz's documentation: https://jambonz.readthedocs.io/en/latest/voice/listen.html
"""


@routes.get("/stream")
async def stream(request):
ws = web.WebSocketResponse(
protocols=("audio.jambonz.org",),
heartbeat=15, # More frequent heartbeats for deployed environments
compress=False, # do not compress audio
max_msg_size=0, # no limit for audio frames
timeout=30, # Add explicit timeout
)
await ws.prepare(request)
logger.info(f"WebSocket prepared, waiting for initial message...")
# This is the initial metadata from Jambonz
initial_message = await ws.receive()
call_info = json.loads(initial_message.data)
logger.info(f"Received call info from Jambonz Server: {call_info}")
logger.info(f"Sample rate: {call_info.get('sampleRate')}")

transport = JambonzTransport(
websocket=ws,
params=JambonzTransportParams(
serializer=JambonzFrameSerializer(
# reference: https://docs.jambonz.org/verbs/verbs/listen#streaming
JambonzFrameSerializer.InputParams(
audio_in_sample_rate=call_info[
"sampleRate"
], # This is the sample rate Jambonz sends us
stt_sample_rate=call_info[
"sampleRate"
], # This is the sample_rate you set in your "stt"
audio_out_sample_rate="8000", # This is the sample rate you set in "sampleRate" under "bidirectionalAudio"
)
),
audio_in_enabled=True,
audio_out_enabled=True,
sample_rate=call_info["sampleRate"],
audio_in_sample_rate=call_info["sampleRate"],
audio_out_sample_rate=call_info["sampleRate"],
audio_in_channels=1,
session_timeout=300,
vad_analyzer=SileroVADAnalyzer(params=VADParams()),
),
)

transport.set_call_info(call_info)
logger.info(f"Starting bot with transport...")

await run_bot(transport)

return ws


app = web.Application()
app.add_routes(routes)

if __name__ == "__main__":
web.run_app(app, port=3002)
163 changes: 163 additions & 0 deletions src/pipecat/serializers/jambonz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Jambonz WebSocket protocol serializer for Pipecat."""

import json
from typing import Optional
from aiohttp import web, WSMsgType
from pydantic import BaseModel
from pipecat.audio.utils import create_stream_resampler
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
InputDTMFFrame,
KeypadEntry,
StartFrame,
StartInterruptionFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType


class JambonzFrameSerializer(FrameSerializer):
"""Serializer for Jambonz WebSocket protocol.

This serializer handles converting between Pipecat frames and Jambonz's WebSocket
protocol. It supports audio conversion, DTMF events, and automatic
call termination. It only supports OpenAI and Elevenlabs TTS at the moment.
"""

class InputParams(BaseModel):
"""Configuration parameters for JambonzFrameSerializer.

Parameters:
audio_in_sample_rate: Optional override for the sample rate of the audio we get from Jambonz.
audio_out_sample_rate: Optional override for the sample rate of the audio we send to Jambonz.
stt_sample_rate: Optional override for the sample rate of the audio we send to the STT service.
auto_hang_up: Whether to automatically terminate call on EndFrame.
"""

audio_in_sample_rate: Optional[int] = None
audio_out_sample_rate: Optional[int] = None
stt_sample_rate: Optional[int] = None
auto_hang_up: bool = True

def __init__(
self,
params: InputParams,
):
"""Initialize the JambonzFrameSerializer.

Args:
params: Configuration parameters.
"""
self._params = params
self._input_resampler = create_stream_resampler()
self._output_resampler = create_stream_resampler()
self._hangup_attempted = False

@property
def type(self) -> FrameSerializerType:
"""Gets the serializer type.

Returns:
The serializer type, either TEXT or BINARY.
"""
return FrameSerializerType.BINARY

async def setup(self, frame: StartFrame):
"""Sets up the serializer with pipeline configuration.

Args:
frame: The StartFrame containing pipeline configuration.
"""
pass

async def serialize(
self, frame: Frame, sample_rate: int = 16000
) -> str | bytes | None:
"""Serializes a Pipecat frame to Jambonz WebSocket format.

Handles conversion of various frame types to Jambonz WebSocket messages.
For EndFrames, initiates call termination if auto_hang_up is enabled.

Args:
frame: The Pipecat frame to serialize.

Returns:
Serialized data as string or bytes, or None if the frame isn't handled.
"""
if (
self._params.auto_hang_up
and not self._hangup_attempted
and isinstance(frame, (EndFrame, CancelFrame))
):
return {"type": "disconnect"}
# stop the previous playAudio from running
elif isinstance(frame, StartInterruptionFrame):
return {"type": "killAudio"}
elif isinstance(frame, AudioRawFrame):
data = frame.audio
serialized_data = await self._output_resampler.resample(
data, frame.sample_rate, self._params.audio_out_sample_rate
)
if serialized_data is None or len(serialized_data) == 0:
return None
return serialized_data
elif isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
return json.dumps(frame.message)

# Return None for unhandled frames
return None

async def deserialize(self, msg: web.WebSocketResponse) -> Frame | None:
"""Deserializes Jambonz WebSocket data to Pipecat frames.

Handles conversion of Jambonz media events to appropriate Pipecat frames.
Audio comes as binary frames, DTMF and other events come as JSON text frames.

Args:
data: The raw WebSocket data from Jambonz.

Returns:
A Pipecat frame corresponding to the Jambonz event, or None if unhandled.
"""
# Handle binary audio frames
if msg.type == WSMsgType.BINARY:
# Audio comes as raw binary PCM data
# Input: Resample from Jambonz sample rate to pipeline input rate
deserialized_data = await self._input_resampler.resample(
msg.data,
self._params.audio_in_sample_rate,
self._params.stt_sample_rate or self._params.audio_in_sample_rate,
)
if deserialized_data is None or len(deserialized_data) == 0:
# Ignoring in case we don't have audio
return None

audio_frame = InputAudioRawFrame(
audio=deserialized_data,
num_channels=1,
sample_rate=self._params.stt_sample_rate
or self._params.audio_in_sample_rate,
)
return audio_frame

# Handle JSON text frames (DTMF, commands, etc.)
elif msg.type == WSMsgType.TEXT:
message = json.loads(msg.data)
if message["event"] == "dtmf":
digit = message["dtmf"]
return InputDTMFFrame(KeypadEntry(digit))
# Handle other JSON events if needed (initial metadata, commands, etc.)
else:
return None
return None
3 changes: 3 additions & 0 deletions src/pipecat/transports/jambonz/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .transport import JambonzTransport, JambonzTransportParams

__all__ = ["JambonzTransport", "JambonzTransportParams"]
Loading