Skip to content

Pk/prototype llm failover attempt 4 #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions examples/dynamic/insurance_gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
from pipecat_flows import FlowArgs, FlowManager, FlowResult, FlowsFunctionSchema, NodeConfig

sys.path.append(str(Path(__file__).parent.parent))
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from runner import configure

load_dotenv(override=True)
Expand Down Expand Up @@ -343,8 +345,8 @@ async def main():
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-exp")

context = OpenAILLMContext()
context_aggregator = llm.create_context_aggregator(context)
context = LLMContext()
context_aggregator = LLMContextAggregatorPair(context)

# Create pipeline
pipeline = Pipeline(
Expand All @@ -364,7 +366,7 @@ async def main():
# Initialize flow manager with transition callback
flow_manager = FlowManager(
task=task,
llm=llm,
llms=[llm],
context_aggregator=context_aggregator,
)

Expand Down
119 changes: 46 additions & 73 deletions examples/dynamic/insurance_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@
from pipecat_flows import FlowArgs, FlowManager, FlowResult, NodeConfig

sys.path.append(str(Path(__file__).parent.parent))
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from runner import configure

from pipecat_flows import FlowsFunctionSchema

load_dotenv(override=True)

logger.remove(0)
Expand Down Expand Up @@ -192,19 +196,13 @@ def create_initial_node() -> NodeConfig:
}
],
"functions": [
{
"type": "function",
"function": {
"name": "collect_age",
"handler": collect_age,
"description": "Record customer's age",
"parameters": {
"type": "object",
"properties": {"age": {"type": "integer"}},
"required": ["age"],
},
},
}
FlowsFunctionSchema(
name="collect_age",
description="Record customer's age",
properties={"age": {"type": "integer"}},
required=["age"],
handler=collect_age,
)
],
}

Expand All @@ -220,21 +218,13 @@ def create_marital_status_node() -> NodeConfig:
}
],
"functions": [
{
"type": "function",
"function": {
"name": "collect_marital_status",
"handler": collect_marital_status,
"description": "Record marital status",
"parameters": {
"type": "object",
"properties": {
"marital_status": {"type": "string", "enum": ["single", "married"]}
},
"required": ["marital_status"],
},
},
}
FlowsFunctionSchema(
name="collect_marital_status",
description="Record marital status after customer provides it",
properties={"marital_status": {"type": "string", "enum": ["single", "married"]}},
required=["marital_status"],
handler=collect_marital_status,
)
],
}

Expand All @@ -254,25 +244,16 @@ def create_quote_calculation_node(age: int, marital_status: str) -> NodeConfig:
}
],
"functions": [
{
"type": "function",
"function": {
"name": "calculate_quote",
"handler": calculate_quote,
"description": "Calculate initial insurance quote",
"parameters": {
"type": "object",
"properties": {
"age": {"type": "integer"},
"marital_status": {
"type": "string",
"enum": ["single", "married"],
},
},
"required": ["age", "marital_status"],
},
FlowsFunctionSchema(
name="calculate_quote",
description="Calculate initial insurance quote",
properties={
"age": {"type": "integer"},
"marital_status": {"type": "string", "enum": ["single", "married"]},
},
}
required=["age", "marital_status"],
handler=calculate_quote,
)
],
}

Expand Down Expand Up @@ -300,31 +281,23 @@ def create_quote_results_node(
}
],
"functions": [
{
"type": "function",
"function": {
"name": "update_coverage",
"handler": update_coverage,
"description": "Recalculate quote with new coverage options",
"parameters": {
"type": "object",
"properties": {
"coverage_amount": {"type": "integer"},
"deductible": {"type": "integer"},
},
"required": ["coverage_amount", "deductible"],
},
FlowsFunctionSchema(
name="update_coverage",
description="Recalculate quote with new coverage options",
properties={
"coverage_amount": {"type": "integer"},
"deductible": {"type": "integer"},
},
},
{
"type": "function",
"function": {
"name": "end_quote",
"handler": end_quote,
"description": "Complete the quote process",
"parameters": {"type": "object", "properties": {}},
},
},
required=["coverage_amount", "deductible"],
handler=update_coverage,
),
FlowsFunctionSchema(
name="end_quote",
description="Complete the quote process when customer is satisfied",
properties={},
required=[],
handler=end_quote,
),
],
}

Expand Down Expand Up @@ -367,8 +340,8 @@ async def main():
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")

context = OpenAILLMContext()
context_aggregator = llm.create_context_aggregator(context)
context = LLMContext()
context_aggregator = LLMContextAggregatorPair(context)

# Create pipeline
pipeline = Pipeline(
Expand All @@ -388,7 +361,7 @@ async def main():
# Initialize flow manager with transition callback
flow_manager = FlowManager(
task=task,
llm=llm,
llms=[llm],
context_aggregator=context_aggregator,
)

Expand Down
167 changes: 167 additions & 0 deletions examples/prototype-llm-failover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import os
import sys
from pathlib import Path

import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport

from pipecat_flows import FlowManager, FlowResult, NodeConfig

sys.path.append(str(Path(__file__).parent.parent))

from runner import configure

load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")

current_llm = "OpenAI"
# current_llm = "Google"


async def switch_llm(flow_manager: FlowManager, llm: str) -> tuple[FlowResult, None]:
"""Switch the current LLM service.

Args:
llm: The name of the LLM service to switch to (must be "OpenAI" or "Google").
"""
global current_llm
current_llm = llm
return FlowResult(status="success"), None


async def openai_filter(frame) -> bool:
return current_llm == "OpenAI"


async def google_filter(frame) -> bool:
return current_llm == "Google"


async def get_current_weather(flow_manager: FlowManager) -> tuple[FlowResult, None]:
"""Get the current weather information."""
# This is a placeholder for the actual implementation
# In a real scenario, you would call an API to get the weather data
weather_info = "The current weather is sunny with a temperature of 75 degrees Fahrenheit."
return FlowResult(status="success", response=weather_info), None


def create_initial_node() -> NodeConfig:
return {
"name": "initial",
"role_messages": [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
}
],
"task_messages": [
{
# TODO: should be able to specify "system" for OpenAI and "user" for Google
"role": "system",
"content": "Say a brief hello.",
}
],
"functions": [switch_llm, get_current_weather],
}


# Main setup
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)

transport = DailyTransport(
room_url,
None,
"LLM Failover Bot",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)

stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)

# Shared context and aggregators for both LLMs
context = LLMContext()
context_aggregator = LLMContextAggregatorPair.create(context)

# Primary LLM service
llm_openai = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))

# Secondary LLM service for failover
llm_google = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"))

pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
ParallelPipeline(
[
FunctionFilter(openai_filter, direction=FrameDirection.DOWNSTREAM),
llm_openai,
FunctionFilter(openai_filter, direction=FrameDirection.UPSTREAM),
],
[
FunctionFilter(google_filter, direction=FrameDirection.DOWNSTREAM),
llm_google,
FunctionFilter(google_filter, direction=FrameDirection.UPSTREAM),
],
),
tts,
transport.output(),
context_aggregator.assistant(),
]
)

task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))

# Initialize flow manager
flow_manager = FlowManager(
task=task,
llms=[llm_openai, llm_google],
context_aggregator=context_aggregator,
)

@transport.event_handler("on_client_connected")
async def on_client_connected(transport, participant):
logger.debug("Initializing flow manager")
await flow_manager.initialize(create_initial_node())

runner = PipelineRunner()
await runner.run(task)


if __name__ == "__main__":
asyncio.run(main())
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ classifiers = [
"Topic :: Multimedia :: Sound/Audio",
]
dependencies = [
"pipecat-ai>=0.0.74",
"pipecat-ai>=0.0.81",
"loguru~=0.7.2",
"docstring_parser~=0.16"
]
Expand Down
Loading
Loading