Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Not able to create asynchronous chromadb. #14515

Open
rvssridatta opened this issue Jul 2, 2024 · 3 comments
Open

[Bug]: Not able to create asynchronous chromadb. #14515

rvssridatta opened this issue Jul 2, 2024 · 3 comments
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized

Comments

@rvssridatta
Copy link

Bug Description

I like to create a status_checker api endpoint in fastapi to track the creation of chromadb embeddings. Also I like to create these emebeddings in async mode. Below mentioned the code, but it is giving error. Please do the needful.

Version

llama-index 0.10.12

Steps to Reproduce

import chromadb
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import VectorStoreIndex, StorageContext
from llama_index.core import SimpleDirectoryReader

@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):

try:
 
    # Ensure 'docs' directory exists
    if not os.path.exists("docs"):
        os.makedirs("docs")
    
    # Write the file to server with its original filename
    file_path = os.path.join("docs", file.filename)
    with open(file_path, "wb") as f:
        f.write(await file.read())
    from rag_define import define_rag
    asyncio.create_task(define_rag())

    return JSONResponse(content={"message": "File uploaded successfully"})

except Exception as e:
    return JSONResponse(content={"error": str(e)}, status_code=500)

@app.post("/status")
async def status_checker():
return global_variable.upload_in_progress

async def define_rag():
documents = SimpleDirectoryReader(input_dir="./docs",required_exts = [".docx",".doc",".pdf",".txt"]).load_data()
if os.path.exists('./chroma_db'):
print("*************************utilizing pre generated embeddings from chromadb folder")
chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("quickstart")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
vector_index_chunk = VectorStoreIndex.from_vector_store(vector_store, embed_model=global_variable.embed_model)
else:
chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("quickstart")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
# index = VectorStoreIndex.from_documents(documents,storage_context=storage_context)
vector_index_chunk = await VectorStoreIndex(all_nodes, embed_model=global_variable.embed_model,
storage_context=storage_context)#, use_async=True, show_progress=True)

vector_retriever_chunk = vector_index_chunk.as_retriever(similarity_top_k=5)
global_variable.retriever_chunk = RecursiveRetriever(
"vector",
retriever_dict={"vector": vector_retriever_chunk},
node_dict=all_nodes_dict,
verbose=True,
)
print("Vector store creation done")
global_variable.upload_in_progress = 1
global_variable.query_engine_chunk = RetrieverQueryEngine.from_args(global_variable.retriever_chunk, llm=global_variable.llm, text_qa_template=global_variable.text_qa_template)

Relevant Logs/Tracbacks

vector_index_chunk = await VectorStoreIndex(all_nodes, embed_model=global_variable.embed_model, storage_context=storage_context, use_async=True, show_progress=True)
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 74, in __init__
    super().__init__(
  File "/home/lib/python3.11/site-packages/llama_index/core/indices/base.py", line 91, in __init__
    index_struct = self.build_index_from_nodes(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 307, in build_index_from_nodes
    return self._build_index_from_nodes(nodes, **insert_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lib/python3.11/site-packages/llama_index/core/indices/vector_store/base.py", line 277, in _build_index_from_nodes
    run_async_tasks(tasks)
  File "/homelib/python3.11/site-packages/llama_index/core/async_utils.py", line 49, in run_async_tasks
    outputs: List[Any] = asyncio.run(_gather())
                         ^^^^^^^^^^^^^^^^^^^^^^
  File "/homelib/python3.11/asyncio/runners.py", line 186, in run
    raise RuntimeError(
RuntimeError: asyncio.run() cannot be called from a running event loop
/home/miniconda3/envs/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py:-1: RuntimeWarning: coroutine 'run_async_tasks.<locals>._gather' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/home/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py:-1: RuntimeWarning: coroutine 'VectorStoreIndex._async_add_nodes_to_index' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
@rvssridatta rvssridatta added bug Something isn't working triage Issue needs to be triaged/prioritized labels Jul 2, 2024
Copy link

dosubot bot commented Jul 2, 2024

To resolve the issue of calling asyncio.run() from a running event loop and properly create asynchronous ChromaDB embeddings in your FastAPI application, you should avoid using asyncio.run() within an already running event loop. Instead, you can use await directly or use asyncio.create_task() to schedule the coroutine.

Here is an example of how you can modify your code to handle asynchronous embeddings properly:

import os
import asyncio
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
import chromadb
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import VectorStoreIndex, StorageContext, SimpleDirectoryReader
from llama_index.core.retrievers import RecursiveRetriever
from llama_index.core.query_engine import RetrieverQueryEngine

app = FastAPI()

# Global variable to track upload status
class GlobalVariable:
    upload_in_progress = 0
    retriever_chunk = None
    query_engine_chunk = None
    embed_model = None  # Initialize with your embedding model
    llm = None  # Initialize with your LLM
    text_qa_template = None  # Initialize with your text QA template

global_variable = GlobalVariable()

@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
    try:
        # Ensure 'docs' directory exists
        if not os.path.exists("docs"):
            os.makedirs("docs")
        
        # Write the file to server with its original filename
        file_path = os.path.join("docs", file.filename)
        with open(file_path, "wb") as f:
            f.write(await file.read())
        
        # Start the asynchronous task to define RAG
        asyncio.create_task(define_rag())

        return JSONResponse(content={"message": "File uploaded successfully"})

    except Exception as e:
        return JSONResponse(content={"error": str(e)}, status_code=500)

@app.post("/status")
async def status_checker():
    return {"upload_in_progress": global_variable.upload_in_progress}

async def define_rag():
    documents = SimpleDirectoryReader(input_dir="./docs", required_exts=[".docx", ".doc", ".pdf", ".txt"]).load_data()
    if os.path.exists('./chroma_db'):
        print("*************************utilizing pre generated embeddings from chromadb folder")
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        chroma_collection = chroma_client.get_or_create_collection("quickstart")
        vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        vector_index_chunk = VectorStoreIndex.from_vector_store(vector_store, embed_model=global_variable.embed_model)
    else:
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        chroma_collection = chroma_client.get_or_create_collection("quickstart")
        vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        vector_index_chunk = await VectorStoreIndex.from_documents(documents, embed_model=global_variable.embed_model, storage_context=storage_context, use_async=True, show_progress=True)

    vector_retriever_chunk = vector_index_chunk.as_retriever(similarity_top_k=5)
    global_variable.retriever_chunk = RecursiveRetriever(
        "vector",
        retriever_dict={"vector": vector_retriever_chunk},
        node_dict={node.node_id: node for node in documents},
        verbose=True,
    )
    print("Vector store creation done")
    global_variable.upload_in_progress = 1
    global_variable.query_engine_chunk = RetrieverQueryEngine.from_args(global_variable.retriever_chunk, llm=global_variable.llm, text_qa_template=global_variable.text_qa_template)

In this example, the define_rag function is properly defined as an asynchronous function, and the VectorStoreIndex is created using the await keyword to handle asynchronous operations. This approach ensures that the embeddings are created asynchronously and efficiently without causing runtime errors related to the event loop [1][2].

To continue talking to Dosu, mention @dosu.

@rvssridatta
Copy link
Author

Followed same code suggested by @dosu and executed still getting the same error.
Please do the needful

Copy link

dosubot bot commented Jul 4, 2024

The error persists because asyncio.run() is being called within an already running event loop. To fix this, ensure that all asynchronous operations are properly awaited within the running event loop.

Here's the corrected define_rag function:

async def define_rag():
    documents = SimpleDirectoryReader(input_dir="./docs", required_exts=[".docx", ".doc", ".pdf", ".txt"]).load_data()
    if os.path.exists('./chroma_db'):
        print("*************************utilizing pre generated embeddings from chromadb folder")
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        chroma_collection = chroma_client.get_or_create_collection("quickstart")
        vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        vector_index_chunk = VectorStoreIndex.from_vector_store(vector_store, embed_model=global_variable.embed_model)
    else:
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        chroma_collection = chroma_client.get_or_create_collection("quickstart")
        vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        vector_index_chunk = await VectorStoreIndex.from_documents(documents, embed_model=global_variable.embed_model, storage_context=storage_context, use_async=True, show_progress=True)

    vector_retriever_chunk = vector_index_chunk.as_retriever(similarity_top_k=5)
    global_variable.retriever_chunk = RecursiveRetriever(
        "vector",
        retriever_dict={"vector": vector_retriever_chunk},
        node_dict={node.node_id: node for node in documents},
        verbose=True,
    )
    print("Vector store creation done")
    global_variable.upload_in_progress = 1
    global_variable.query_engine_chunk = RetrieverQueryEngine.from_args(global_variable.retriever_chunk, llm=global_variable.llm, text_qa_template=global_variable.text_qa_template)

This modification ensures that the asynchronous tasks are awaited properly within the running event loop, avoiding the RuntimeError you encountered [1].

To continue talking to Dosu, mention @dosu.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized
Projects
None yet
Development

No branches or pull requests

1 participant