-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
h 'main' of https://github.com/agno-agi/agno into release-1.1.5
- Loading branch information
Showing
14 changed files
with
648 additions
and
18 deletions.
There are no files selected for viewing
30 changes: 30 additions & 0 deletions
30
cookbook/agent_concepts/knowledge/vector_dbs/upstash_db.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# install upstash-vector - `uv pip install upstash-vector` | ||
# Add OPENAI_API_KEY to your environment variables for the agent response | ||
|
||
from agno.agent import Agent | ||
from agno.knowledge.pdf_url import PDFUrlKnowledgeBase | ||
from agno.vectordb.upstashdb.upstashdb import UpstashVectorDb | ||
|
||
# How to connect to an Upstash Vector index | ||
# - Create a new index in Upstash Console with the correct dimension | ||
# - Fetch the URL and token from Upstash Console | ||
# - Replace the values below or use environment variables | ||
|
||
# Initialize Upstash DB | ||
vector_db = UpstashVectorDb( | ||
url="UPSTASH_VECTOR_REST_URL", | ||
token="UPSTASH_VECTOR_REST_TOKEN", | ||
) | ||
|
||
# Create a new PDFUrlKnowledgeBase | ||
knowledge_base = PDFUrlKnowledgeBase( | ||
urls=["https://phi-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"], | ||
vector_db=vector_db, | ||
) | ||
|
||
# Load the knowledge base - after first run, comment out | ||
knowledge_base.load(recreate=False, upsert=True) | ||
|
||
# Create and use the agent | ||
agent = Agent(knowledge=knowledge_base, show_tool_calls=True) | ||
agent.print_response("What are some tips for cooking glass noodles?", markdown=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
""" | ||
Run `pip install openai webexpythonsdk` to install dependencies. | ||
To get the Webex Teams Access token refer to - https://developer.webex.com/docs/bots | ||
Steps: | ||
1. Sign up for Webex Teams and go to the Webex [Developer Portal](https://developer.webex.com/) | ||
2. Create the Bot | ||
2.1 Click in the top-right on your profile → My Webex Apps → Create a Bot. | ||
2.2 Enter Bot Name, Username, Icon, and Description, then click Add Bot. | ||
3. Get the Access Token | ||
3.1 Copy the Access Token shown on the confirmation page (displayed once). | ||
3.2 If lost, regenerate it via My Webex Apps → Edit Bot → Regenerate Access Token. | ||
4. Set the WEBEX_ACCESS_TOKEN environment variable | ||
5. Launch Webex itself and add your bot to a space like the Welcome space. Use the bot's email address (e.g. [email protected]) | ||
""" | ||
|
||
import os | ||
|
||
from agno.agent import Agent | ||
from agno.tools.webex import WebexTools | ||
|
||
agent = Agent(tools=[WebexTools()], show_tool_calls=True) | ||
|
||
# List all space in Webex | ||
agent.print_response("List all space on our Webex", markdown=True) | ||
|
||
# Send a message to a Space in Webex | ||
agent.print_response( | ||
"Send a funny ice-breaking message to the webex Welcome space", markdown=True | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
import json | ||
import os | ||
from typing import Optional | ||
|
||
from agno.tools.toolkit import Toolkit | ||
from agno.utils.log import logger | ||
|
||
try: | ||
from webexpythonsdk import WebexAPI | ||
from webexpythonsdk.exceptions import ApiError | ||
except ImportError: | ||
logger.error("Webex tools require the `webexpythonsdk` package. Run `pip install webexpythonsdk` to install it.") | ||
|
||
|
||
class WebexTools(Toolkit): | ||
def __init__(self, send_message: bool = True, list_rooms: bool = True, access_token: Optional[str] = None): | ||
super().__init__(name="webex") | ||
if access_token is None: | ||
access_token = os.getenv("WEBEX_ACCESS_TOKEN") | ||
if access_token is None: | ||
raise ValueError("Webex access token is not set. Please set the WEBEX_ACCESS_TOKEN environment variable.") | ||
|
||
self.client = WebexAPI(access_token=access_token) | ||
if send_message: | ||
self.register(self.send_message) | ||
if list_rooms: | ||
self.register(self.list_rooms) | ||
|
||
def send_message(self, room_id: str, text: str) -> str: | ||
""" | ||
Send a message to a Webex Room. | ||
Args: | ||
room_id (str): The Room ID to send the message to. | ||
text (str): The text of the message to send. | ||
Returns: | ||
str: A JSON string containing the response from the Webex. | ||
""" | ||
try: | ||
response = self.client.messages.create(roomId=room_id, text=text) | ||
return json.dumps(response.json_data) | ||
except ApiError as e: | ||
logger.error(f"Error sending message: {e} in room: {room_id}") | ||
return json.dumps({"error": str(e)}) | ||
|
||
def list_rooms(self) -> str: | ||
""" | ||
List all rooms in the Webex. | ||
Returns: | ||
str: A JSON string containing the list of rooms. | ||
""" | ||
try: | ||
response = self.client.rooms.list() | ||
rooms_list = [ | ||
{ | ||
"id": room.id, | ||
"title": room.title, | ||
"type": room.type, | ||
"isPublic": room.isPublic, | ||
"isReadOnly": room.isReadOnly, | ||
} | ||
for room in response | ||
] | ||
|
||
return json.dumps({"rooms": rooms_list}, indent=4) | ||
except ApiError as e: | ||
logger.error(f"Error listing rooms: {e}") | ||
return json.dumps({"error": str(e)}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from agno.vectordb.upstashdb.upstashdb import UpstashVectorDb |
Oops, something went wrong.