diff --git a/backend/celery_worker.py b/backend/celery_worker.py index c200c24e1fee..eb81571904f0 100644 --- a/backend/celery_worker.py +++ b/backend/celery_worker.py @@ -321,8 +321,6 @@ def process_assistant_task( logger.debug(f"Input: {input}") logger.debug(type(input)) _input = InputAssistant.model_validate(input_in) - # _input = InputAssistant(**json.loads(input)) # type: ignore - # _input = InputAssistant(json.dumps(_input)) _current_user = UserIdentity(**current_user) # type: ignore try: files = [] @@ -333,25 +331,14 @@ def process_assistant_task( base_file_name = os.path.basename(file_name) _, file_extension = os.path.splitext(base_file_name) - with NamedTemporaryFile(suffix="_" + tmp_name, delete=False) as tmp_file: - res = supabase_client.storage.from_("quivr").download(file_name) - tmp_file.write(res) - tmp_file.flush() - - file_instance = File( - file_name=base_file_name, - tmp_file_path=tmp_file.name, - bytes_content=res, - file_size=len(res), - file_extension=file_extension, - ) - upload_file = UploadFile( - filename=file_instance.file_name, - size=file_instance.file_size, - file=BytesIO(file_instance.bytes_content), - headers='{"content-type": "application/pdf"}', # type : ignore - ) - files.append(upload_file) + res = supabase_client.storage.from_("quivr").download(file_name) + upload_file = UploadFile( + filename=base_file_name, + size=len(res), + file=BytesIO(res), + headers='{"content-type": "application/pdf"}', # type : ignore + ) + files.append(upload_file) except Exception as e: logger.exception(e) @@ -363,12 +350,14 @@ def process_assistant_task( description=f"An error occurred while processing the file: {e}", ), ) - return + raise e loop = asyncio.get_event_loop() - asyncio.set_event_loop(asyncio.new_event_loop()) + # asyncio.set_event_loop(asyncio.new_event_loop()) if _input.name.lower() == "summary": + logger.debug(f"N Files given : {len(files)}") + logger.debug(f"Input given : {input_in}") summary_assistant = SummaryAssistant( input=_input, files=files, current_user=_current_user ) @@ -431,8 +420,8 @@ def process_assistant_task( "task": "check_if_is_premium_user", "schedule": crontab(minute="*/1", hour="*"), }, - "process_assistant": { - "task": "process_assistant_task", - "schedule": crontab(minute="*/1", hour="*"), - }, + # "process_assistant": { + # "task": "process_assistant_task", + # "schedule": crontab(minute="*/1", hour="*"), + # }, } diff --git a/backend/modules/assistant/dto/inputs.py b/backend/modules/assistant/dto/inputs.py index f75c86c5528c..631f3e4fee74 100644 --- a/backend/modules/assistant/dto/inputs.py +++ b/backend/modules/assistant/dto/inputs.py @@ -79,5 +79,4 @@ class InputAssistant(BaseModel): @model_validator(mode="before") @classmethod def to_py_dict(cls, data): - if isinstance(data, str): - return json.loads(data) + return json.loads(data) diff --git a/backend/modules/assistant/ito/difference/difference_agent.py b/backend/modules/assistant/ito/difference/difference_agent.py index 12f413595970..3fde7a7301f3 100644 --- a/backend/modules/assistant/ito/difference/difference_agent.py +++ b/backend/modules/assistant/ito/difference/difference_agent.py @@ -1,4 +1,5 @@ import asyncio +import time import pandas as pd from pydantic import BaseModel, Field @@ -55,7 +56,34 @@ def generate_questions( target_content=target_content, language_verification=language_verification ) - def run( + async def query_one(self, question: str, n_retry: int = 3, verbose: bool = False): + response = ResponseType(name=None, detailed_answer=None, decision=None) + retry_count = 0 + while retry_count < n_retry and response.name is None: + try: + response: ResponseType = await self.diff_query_engine.query_engine.aquery(question[:-1]) # type: ignore + nodes_to_update = [response.source_nodes[int(i)] for i in response.response.used_chunks] # type: ignore + self.diff_query_engine.update_query_engine(nodes_to_update) + + except Exception as e: + # Exponential backoff + time.sleep(2**retry_count) + retry_count += 1 + if verbose: + print(f"Error with question: {question}") + print("Retry ...") + if retry_count == n_retry: + print("n_retry reached, skipping question ...") + + return { + "decision": ( + response.decision.value if response.decision else response.decision + ), + "name": response.name, + "detailed_answer": response.detailed_answer, + } + + async def run( self, target_content: str | None = None, language_verification: bool = False, @@ -76,51 +104,12 @@ def run( print("Querying generated questions to the reference document...") analysis = [] - async def query_all(questions): - return await asyncio.gather(*[query_one(question) for question in questions]) # type: ignore - - async def query_one(question): - response = ResponseType(name=None, detailed_answer=None, decision=None) - - for i in range(n_retry): - try: - response: ResponseType = await self.diff_query_engine.query_engine.aquery(question[:-1]) # type: ignore - nodes_to_update = [response.source_nodes[int(i)] for i in response.response.used_chunks] # type: ignore - self.diff_query_engine.update_query_engine(nodes_to_update) - - except Exception as e: - print(e) - if verbose: - print(f"Error with question: {question}") - print("Retry ...") - if i == 2: - print( - f"{n_retry} repeted errors with the same question, deleting the question." - ) - break - continue - break - return { - "decision": ( - response.decision.value if response.decision else response.decision - ), - "name": response.name, - "detailed_answer": response.detailed_answer, - } - - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - try: - analysis = loop.run_until_complete(query_all(self.questions)) - self.generated_df = pd.DataFrame(analysis) - return self.generated_df - finally: - if loop.is_running(): - loop.close() + async def query_all(questions: list[str] | None, n_retry: int = 3): + return await asyncio.gather(*[self.query_one(question, n_retry=n_retry) for question in questions]) # type: ignore + + analysis = await query_all(self.questions, n_retry=n_retry) + self.generated_df = pd.DataFrame(analysis) + return self.generated_df def get_detail(self, category, iteration, name): return self.generated_df.loc[ diff --git a/backend/modules/assistant/ito/difference/query_engine.py b/backend/modules/assistant/ito/difference/query_engine.py index f9a2d342b600..148f4dfc327d 100644 --- a/backend/modules/assistant/ito/difference/query_engine.py +++ b/backend/modules/assistant/ito/difference/query_engine.py @@ -5,7 +5,6 @@ from enum import Enum from typing import List, Optional -import nest_asyncio import uvloop from langchain_core.pydantic_v1 import BaseModel, Field from llama_index.core import Document, QueryBundle, VectorStoreIndex @@ -33,7 +32,6 @@ class Ingredient(BaseModel): used_chunks: list[int] = Field( description="List of chunks id used to generate the answer, None if the response was not found in the chunks. - focus on those with high information density that directly answer the query. Never select a table of content or any text similar." ) - # more_info: str = Field(description="More information on how the chunks (with ID) were used to generate the answer, start with look into chunk id to ..., then look into chunk id ..., keep it short") class AddChunkId(BaseNodePostprocessor): @@ -79,8 +77,6 @@ def __init__(self, source_md: List[str], top_k: int = 15): def get_query_engine(self, source_md: List[str]) -> None: """Get a query engine for markdown files.""" - if not isinstance(asyncio.get_event_loop(), uvloop.Loop): - nest_asyncio.apply() md_document = [Document(text=sub_source_md) for sub_source_md in source_md] node_parser = MarkdownNodeParser() diff --git a/backend/modules/assistant/ito/difference_assistant.py b/backend/modules/assistant/ito/difference_assistant.py index ab34b8f72db9..b22d389e34f5 100644 --- a/backend/modules/assistant/ito/difference_assistant.py +++ b/backend/modules/assistant/ito/difference_assistant.py @@ -2,11 +2,11 @@ import os import tempfile import traceback +from concurrent.futures import ThreadPoolExecutor from typing import List -import nest_asyncio +import httpx import pandas as pd -import uvloop from fastapi import UploadFile from logger import get_logger from megaparse.Converter import MegaParse @@ -21,16 +21,31 @@ ) from modules.assistant.ito.difference.difference_agent import ContextType from modules.assistant.ito.ito import ITO -from modules.upload.service import upload_file + +# from modules.upload.controller.upload_routes import upload_file #FIXME circular import from modules.user.entity.user_identity import UserIdentity from .difference import DifferenceAgent, DiffQueryEngine -if not isinstance(asyncio.get_event_loop(), uvloop.Loop): - nest_asyncio.apply() +logger = get_logger(__name__) -logger = get_logger(__name__) +# FIXME: PATCHER -> find another solution +async def upload_file_to_api(upload_file, brain_id, current_user): + url = "http://localhost:5050/upload" + headers = { + "Authorization": f"Bearer {current_user.token}", + "Content-Type": "application/json", + } + data = { + "uploadFile": upload_file, + "brain_id": brain_id, + "chat_id": None, + } + async with httpx.AsyncClient() as client: + response = await client.post(url, headers=headers, json=data) + response.raise_for_status() # Raise an error for 4xx/5xx responses + return response.json() class DifferenceAssistant(ITO): @@ -117,13 +132,23 @@ async def process_assistant(self): print(target_doc_name, "\n") print(ref_doc_name, "\n") + def run_megaparse(): + megaparse = MegaParse( + file_path=document_1_tmp.name, + llama_parse_api_key=os.getenv("LLAMA_CLOUD_API_KEY"), + ) + return megaparse.convert(gpt4o_cleaner=True) + # breakpoint() try: megaparse = MegaParse( file_path=document_1_tmp.name, llama_parse_api_key=os.getenv("LLAMA_CLOUD_API_KEY"), ) - ref_doc_md = megaparse.convert(gpt4o_cleaner=True) + loop = asyncio.get_event_loop() + executor = ThreadPoolExecutor(max_workers=1) + ref_doc_md = await loop.run_in_executor(executor, run_megaparse) # type: ignore + # ref_doc_md = megaparse.convert(gpt4o_cleaner=True) except Exception as e: print(e) print(traceback.format_exc()) @@ -164,7 +189,7 @@ async def process_assistant(self): diff_df = pd.DataFrame() - diff_df = agent.run(additional_context=additional_context) + diff_df = await agent.run(additional_context=additional_context) diff_df = diff_df.dropna() print("\nNice, the process has succeeded, giving back the json ;)\n") @@ -186,7 +211,7 @@ async def process_assistant(self): await self.send_output_by_email( file_to_upload, new_filename, - "Summary", + "Difference", f"{file_description} of {original_filename}", brain_id=( self.input.outputs.brain.value @@ -201,11 +226,10 @@ async def process_assistant(self): # Upload the file if required if self.input.outputs.brain.activated: - await upload_file( - uploadFile=file_to_upload, + response = await upload_file_to_api( + upload_file=file_to_upload, brain_id=self.input.outputs.brain.value, current_user=self.current_user, - chat_id=None, ) return {"message": f"{file_description} generated successfully"} diff --git a/backend/modules/assistant/ito/summary.py b/backend/modules/assistant/ito/summary.py index e83358eff12f..0e67a9f5b6b0 100644 --- a/backend/modules/assistant/ito/summary.py +++ b/backend/modules/assistant/ito/summary.py @@ -1,6 +1,7 @@ import tempfile from typing import List +import httpx from fastapi import UploadFile from langchain.chains import ( MapReduceDocumentsChain, @@ -23,12 +24,29 @@ Outputs, ) from modules.assistant.ito.ito import ITO -from modules.upload.service import upload_file from modules.user.entity.user_identity import UserIdentity logger = get_logger(__name__) +# FIXME: PATCHER -> find another solution +async def upload_file_to_api(upload_file, brain_id, current_user): + url = "http://localhost:5050/upload" + headers = { + "Authorization": f"Bearer {current_user.token}", + "Content-Type": "application/json", + } + data = { + "uploadFile": upload_file, + "brain_id": brain_id, + "chat_id": None, + } + async with httpx.AsyncClient() as client: + response = await client.post(url, headers=headers, json=data) + response.raise_for_status() # Raise an error for 4xx/5xx responses + return response.json() + + class SummaryAssistant(ITO): def __init__( @@ -188,11 +206,10 @@ async def process_assistant(self): # Upload the file if required if self.input.outputs.brain.activated: - await upload_file( - uploadFile=file_to_upload, + response = await upload_file_to_api( + upload_file=file_to_upload, brain_id=self.input.outputs.brain.value, current_user=self.current_user, - chat_id=None, ) return {"message": f"{file_description} generated successfully"} diff --git a/backend/packages/files/parsers/common.py b/backend/packages/files/parsers/common.py index 6b56aeb30a65..2a642bb96ddb 100644 --- a/backend/packages/files/parsers/common.py +++ b/backend/packages/files/parsers/common.py @@ -3,7 +3,6 @@ import tempfile import time -import nest_asyncio import tiktoken import uvloop from langchain.schema import Document @@ -15,9 +14,6 @@ from modules.upload.service.upload_file import DocumentSerializable from packages.embeddings.vectors import Neurons -if not isinstance(asyncio.get_event_loop(), uvloop.Loop): - nest_asyncio.apply() - logger = get_logger(__name__)