-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Firestore memory #1975
base: main
Are you sure you want to change the base?
Firestore memory #1975
Changes from all commits
a07f620
2a5a015
fe6754e
d27542d
4e43314
8fc53f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
""" | ||
This recipe shows how to store agent sessions in a MongoDB database. | ||
Steps: | ||
1. Run: `pip install openai google-cloud-firestore agno` to install dependencies | ||
2. Make sure your gcloud project is set up and you have the necessary permissions to access Firestore | ||
3. Run: `python cookbook/memory/09_persistent_memory_firestore.py` to run the agent | ||
""" | ||
|
||
import json | ||
|
||
from agno.agent import Agent | ||
from agno.memory.agent import AgentMemory | ||
from agno.memory.db.firestore import FirestoreMemoryDb | ||
from agno.models.openai import OpenAIChat | ||
from agno.storage.agent.firestore import FirestoreAgentStorage | ||
|
||
from rich.console import Console | ||
from rich.json import JSON | ||
from rich.panel import Panel | ||
|
||
# The only required argument is the collection name. | ||
# Firestore will connect automatically using your google cloud credentials. | ||
# If you don't specificy a db_name, the class uses the (default) database by default to allow free tier access to firestore. | ||
# You can specify a project_id if you'd like to connect to firestore in a different GCP project | ||
|
||
agent = Agent( | ||
model=OpenAIChat(id="gpt-4o"), | ||
# Store agent sessions in Firestore | ||
storage=FirestoreAgentStorage( | ||
collection_name="agent_sessions", | ||
), | ||
# Store memories in Firestore | ||
memory=AgentMemory( | ||
db=FirestoreMemoryDb(), | ||
create_user_memories=True, | ||
create_session_summary=True, | ||
), | ||
# Set add_history_to_messages=true to add the previous chat history to the messages sent to the Model. | ||
add_history_to_messages=True, | ||
# Number of historical responses to add to the messages. | ||
num_history_responses=3, | ||
# The session_id is used to identify the session in the database | ||
# You can resume any session by providing a session_id | ||
# session_id="xxxx-xxxx-xxxx-xxxx", | ||
# Description creates a system prompt for the agent | ||
description="You are a helpful assistant that always responds in a polite, upbeat and positive manner.", | ||
) | ||
|
||
console = Console() | ||
|
||
|
||
def print_chat_history(agent): | ||
# -*- Print history | ||
console.print( | ||
Panel( | ||
JSON( | ||
json.dumps( | ||
[ | ||
m.model_dump(include={"role", "content"}) | ||
for m in agent.memory.messages | ||
] | ||
), | ||
indent=4, | ||
), | ||
title=f"Chat History for session_id: {agent.session_id}", | ||
expand=True, | ||
) | ||
) | ||
|
||
|
||
# -*- Create a run | ||
agent.print_response("Share a 2 sentence horror story", stream=True) | ||
# -*- Print the chat history | ||
print_chat_history(agent) | ||
|
||
# -*- Ask a follow up question that continues the conversation | ||
agent.print_response("What was my first message?", stream=True) | ||
# -*- Print the chat history | ||
print_chat_history(agent) |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,239 @@ | ||||||
from typing import Optional, List | ||||||
from datetime import datetime, timezone | ||||||
|
||||||
from agno.memory.db import MemoryDb | ||||||
from agno.memory.row import MemoryRow | ||||||
from agno.utils.log import logger | ||||||
|
||||||
try: | ||||||
from google.cloud import firestore | ||||||
from google.cloud.firestore import Client | ||||||
from google.cloud.firestore import CollectionReference, DocumentReference | ||||||
from google.cloud.firestore_v1.base_query import FieldFilter, BaseQuery | ||||||
import google.auth | ||||||
except ImportError: | ||||||
raise ImportError( | ||||||
"`firestore` not installed. Please install it with `pip install google-cloud-firestore`" | ||||||
) | ||||||
|
||||||
|
||||||
class FirestoreMemoryDb(MemoryDb): | ||||||
def __init__( | ||||||
self, | ||||||
collection_name: str = "memory", | ||||||
db_name: Optional[str] = "(default)", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The [default] 'database' gets you free-tier use which in google cloud is pretty generous. |
||||||
client: Optional[Client] = None, | ||||||
project: Optional[str] = None, | ||||||
): | ||||||
""" | ||||||
This class provides a memory store backed by a firestore collection. | ||||||
Memories are stored by user_id {self.collection_name}/{user_id}/memories to avoid having a firestore index | ||||||
since they are difficult to create on the fly. | ||||||
(index is required for a filtered order_by, not required using this model) | ||||||
|
||||||
Args: | ||||||
collection_name: The name of the collection to store memories | ||||||
db_name: Name of the firestore database (Default is to use (default) for the free tier/default database) | ||||||
client: Optional existing firestore client | ||||||
project: Optional name of the GCP project to use | ||||||
""" | ||||||
self._client: Optional[Client] = client | ||||||
|
||||||
if self._client is None: | ||||||
self._client = firestore.Client(database=db_name, project=project) | ||||||
|
||||||
self.collection_name: str = collection_name | ||||||
self.db_name: str = db_name | ||||||
self.collection: CollectionReference = self._client.collection( | ||||||
self.collection_name | ||||||
) | ||||||
|
||||||
# store a user id for the collection when we get one | ||||||
# for use in the delete method due to the data structure | ||||||
self._user_id = None | ||||||
|
||||||
# utilities to recursively delete all documents in a collection and the collection itself | ||||||
def _delete_document(self, document: DocumentReference): | ||||||
logger.debug(f"Deleting document: {document.path}") | ||||||
for collection in document.collections(): | ||||||
self._delete_collection(collection) | ||||||
document.delete() | ||||||
|
||||||
def _delete_collection(self, collection: CollectionReference): | ||||||
for document in collection.list_documents(): | ||||||
self._delete_document(document) | ||||||
|
||||||
def create(self) -> None: | ||||||
"""Create the collection index | ||||||
Avoiding index creation by using a user/memory model | ||||||
|
||||||
Returns: | ||||||
None | ||||||
""" | ||||||
try: | ||||||
logger.info(f"Mocked call to create index for '{self.collection_name}'") | ||||||
except Exception as e: | ||||||
logger.error(f"Error creating collection: {e}") | ||||||
raise | ||||||
Comment on lines
+66
to
+77
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed this doesn’t seem to be doing anything. Was that intentional? Just wanted to check. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, indexes are odd in firestore. Fields are indexed automagically so best to use a model that leverages that. To create an index is an async job where they mention the need for an index in a log file that points you back to the google cloud console to create it. You can do it in code, but it doesn't complete in time to use for a session. This approach seemed to solve for being able to use firestore immediately with as little setup as possible. |
||||||
|
||||||
def memory_exists(self, memory: MemoryRow) -> bool: | ||||||
"""Check if a memory exists | ||||||
Args: | ||||||
memory: MemoryRow to check | ||||||
Returns: | ||||||
bool: True if the memory exists, False otherwise | ||||||
""" | ||||||
try: | ||||||
logger.info(f"Checking if memory exists: {memory.id}") | ||||||
# save our user_id | ||||||
self._user_id = memory.user_id | ||||||
result = self.collection.document(memory.id).get().exists | ||||||
return result | ||||||
except Exception as e: | ||||||
logger.error(f"Error checking memory existence: {e}") | ||||||
return False | ||||||
|
||||||
def get_user_collection(self, user_id: str) -> CollectionReference: | ||||||
return self._client.collection(f"{self.collection_name}/{user_id}/memories") | ||||||
|
||||||
def read_memories(self, user_id: str, limit=None, sort=None) -> List[MemoryRow]: | ||||||
"""Read memories from the collection | ||||||
Avoids using an index since they are hard to create on the fly with firestore | ||||||
Args: | ||||||
user_id: ID of the user to read | ||||||
limit: Maximum number of memories to read | ||||||
sort: Sort order ("asc" or "desc") | ||||||
Returns: | ||||||
List[MemoryRow]: List of memories | ||||||
""" | ||||||
memories: List[MemoryRow] = [] | ||||||
try: | ||||||
user_collection = self.get_user_collection(user_id) | ||||||
self._user_id = user_id | ||||||
query = user_collection.order_by( | ||||||
"created_at", | ||||||
direction=( | ||||||
firestore.Query.ASCENDING | ||||||
if sort == "asc" | ||||||
else firestore.Query.DESCENDING | ||||||
), | ||||||
) | ||||||
if limit is not None: | ||||||
query = query.limit(limit) | ||||||
|
||||||
# Execute query | ||||||
docs = query.stream() | ||||||
for doc in docs: | ||||||
data = doc.to_dict() | ||||||
memories.append( | ||||||
MemoryRow(id=data["id"], user_id=user_id, memory=data["memory"]) | ||||||
) | ||||||
except Exception as e: | ||||||
logger.error(f"Error reading memories: {e}") | ||||||
return memories | ||||||
|
||||||
def upsert_memory(self, memory: MemoryRow, create_and_retry: bool = True) -> None: | ||||||
"""Upsert a memory into the user-specific collection | ||||||
Args: | ||||||
memory: MemoryRow to upsert | ||||||
create_and_retry: Whether to create a new memory if the id already exists | ||||||
Returns: | ||||||
None | ||||||
""" | ||||||
try: | ||||||
logger.info(f"Upserting memory: {memory.id} for user: {memory.user_id}") | ||||||
# save our user_id | ||||||
self._user_id = memory.user_id | ||||||
now = datetime.now(timezone.utc) | ||||||
timestamp = int(now.timestamp()) | ||||||
|
||||||
# Get user-specific collection | ||||||
user_collection = self.get_user_collection(memory.user_id) | ||||||
doc_ref = user_collection.document(memory.id) | ||||||
|
||||||
# Add version field for optimistic locking | ||||||
memory_dict = memory.model_dump() | ||||||
if "_version" not in memory_dict: | ||||||
memory_dict["_version"] = 1 | ||||||
else: | ||||||
memory_dict["_version"] += 1 | ||||||
|
||||||
update_data = { | ||||||
"id": memory.id, | ||||||
"memory": memory.memory, | ||||||
"updated_at": timestamp, | ||||||
"_version": memory_dict["_version"], | ||||||
} | ||||||
|
||||||
# For new documents, set created_at | ||||||
doc = doc_ref.get() | ||||||
if not doc.exists: | ||||||
update_data["created_at"] = timestamp | ||||||
|
||||||
# Use transactions for atomic updates | ||||||
@firestore.transactional | ||||||
def update_in_transaction(transaction, doc_ref, data): | ||||||
transaction.set(doc_ref, data, merge=True) | ||||||
|
||||||
transaction = self._client.transaction() | ||||||
update_in_transaction(transaction, doc_ref, update_data) | ||||||
|
||||||
except Exception as e: | ||||||
logger.error(f"Error upserting memory: {e}") | ||||||
raise | ||||||
|
||||||
def delete_memory(self, id: str) -> None: | ||||||
"""Delete a memory from the collection | ||||||
Args: | ||||||
id: ID of the memory to delete | ||||||
Returns: | ||||||
None | ||||||
""" | ||||||
try: | ||||||
logger.debug(f"Call to delete memory with id: {id}") | ||||||
# since our memories are stored by user | ||||||
# retrieve our copy of the user_id | ||||||
if self._user_id: | ||||||
user_collection = self.get_user_collection(self._user_id) | ||||||
user_collection.document(id).delete() | ||||||
else: | ||||||
logger.info("No user id provided, skipping delete") | ||||||
|
||||||
except Exception as e: | ||||||
logger.error(f"Error deleting memory: {e}") | ||||||
raise | ||||||
|
||||||
def drop_table(self) -> None: | ||||||
"""Drop the collection | ||||||
|
||||||
Returns: | ||||||
None | ||||||
""" | ||||||
# Caveats https://firebase.google.com/docs/firestore/solutions/delete-collections | ||||||
# We can delete all docs | ||||||
|
||||||
try: | ||||||
self._delete_collection(self.collection) | ||||||
|
||||||
except Exception as e: | ||||||
logger.error(f"Error dropping collection: {e}") | ||||||
|
||||||
def table_exists(self) -> bool: | ||||||
"""Check if the collection exists | ||||||
Returns: | ||||||
bool: True if the collection exists, False otherwise | ||||||
""" | ||||||
logger.debug(f"Call to check if collection exists: {self.collection_name}") | ||||||
return self.collection_name in [i._path[0] for i in self._client.collections()] | ||||||
|
||||||
def clear(self) -> bool: | ||||||
"""Clear the collection | ||||||
Returns: | ||||||
bool: True if the collection was cleared, False otherwise | ||||||
""" | ||||||
try: | ||||||
self._delete_collection(self.collection) | ||||||
|
||||||
return True | ||||||
except Exception as e: | ||||||
logger.error(f"Error dropping collection: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please also mention steps on how to set up gcloud locally and set up the project?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be ok to reference official google docs like https://cloud.google.com/firestore/docs/create-database-server-client-library ?