Skip to content

Commit

Permalink
fix: PR comments & summary
Browse files Browse the repository at this point in the history
  • Loading branch information
chloedia committed Jun 21, 2024
1 parent eceaee3 commit eee9332
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 99 deletions.
43 changes: 16 additions & 27 deletions backend/celery_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,6 @@ def process_assistant_task(
logger.debug(f"Input: {input}")
logger.debug(type(input))
_input = InputAssistant.model_validate(input_in)
# _input = InputAssistant(**json.loads(input)) # type: ignore
# _input = InputAssistant(json.dumps(_input))
_current_user = UserIdentity(**current_user) # type: ignore
try:
files = []
Expand All @@ -333,25 +331,14 @@ def process_assistant_task(
base_file_name = os.path.basename(file_name)
_, file_extension = os.path.splitext(base_file_name)

with NamedTemporaryFile(suffix="_" + tmp_name, delete=False) as tmp_file:
res = supabase_client.storage.from_("quivr").download(file_name)
tmp_file.write(res)
tmp_file.flush()

file_instance = File(
file_name=base_file_name,
tmp_file_path=tmp_file.name,
bytes_content=res,
file_size=len(res),
file_extension=file_extension,
)
upload_file = UploadFile(
filename=file_instance.file_name,
size=file_instance.file_size,
file=BytesIO(file_instance.bytes_content),
headers='{"content-type": "application/pdf"}', # type : ignore
)
files.append(upload_file)
res = supabase_client.storage.from_("quivr").download(file_name)
upload_file = UploadFile(
filename=base_file_name,
size=len(res),
file=BytesIO(res),
headers='{"content-type": "application/pdf"}', # type : ignore
)
files.append(upload_file)

except Exception as e:
logger.exception(e)
Expand All @@ -363,12 +350,14 @@ def process_assistant_task(
description=f"An error occurred while processing the file: {e}",
),
)
return
raise e
loop = asyncio.get_event_loop()

asyncio.set_event_loop(asyncio.new_event_loop())
# asyncio.set_event_loop(asyncio.new_event_loop())

if _input.name.lower() == "summary":
logger.debug(f"N Files given : {len(files)}")
logger.debug(f"Input given : {input_in}")
summary_assistant = SummaryAssistant(
input=_input, files=files, current_user=_current_user
)
Expand Down Expand Up @@ -431,8 +420,8 @@ def process_assistant_task(
"task": "check_if_is_premium_user",
"schedule": crontab(minute="*/1", hour="*"),
},
"process_assistant": {
"task": "process_assistant_task",
"schedule": crontab(minute="*/1", hour="*"),
},
# "process_assistant": {
# "task": "process_assistant_task",
# "schedule": crontab(minute="*/1", hour="*"),
# },
}
3 changes: 1 addition & 2 deletions backend/modules/assistant/dto/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,4 @@ class InputAssistant(BaseModel):
@model_validator(mode="before")
@classmethod
def to_py_dict(cls, data):
if isinstance(data, str):
return json.loads(data)
return json.loads(data)
81 changes: 35 additions & 46 deletions backend/modules/assistant/ito/difference/difference_agent.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import time

import pandas as pd
from pydantic import BaseModel, Field
Expand Down Expand Up @@ -55,7 +56,34 @@ def generate_questions(
target_content=target_content, language_verification=language_verification
)

def run(
async def query_one(self, question: str, n_retry: int = 3, verbose: bool = False):
response = ResponseType(name=None, detailed_answer=None, decision=None)
retry_count = 0
while retry_count < n_retry and response.name is None:
try:
response: ResponseType = await self.diff_query_engine.query_engine.aquery(question[:-1]) # type: ignore
nodes_to_update = [response.source_nodes[int(i)] for i in response.response.used_chunks] # type: ignore
self.diff_query_engine.update_query_engine(nodes_to_update)

except Exception as e:
# Exponential backoff
time.sleep(2**retry_count)
retry_count += 1
if verbose:
print(f"Error with question: {question}")
print("Retry ...")
if retry_count == n_retry:
print("n_retry reached, skipping question ...")

return {
"decision": (
response.decision.value if response.decision else response.decision
),
"name": response.name,
"detailed_answer": response.detailed_answer,
}

async def run(
self,
target_content: str | None = None,
language_verification: bool = False,
Expand All @@ -76,51 +104,12 @@ def run(
print("Querying generated questions to the reference document...")
analysis = []

async def query_all(questions):
return await asyncio.gather(*[query_one(question) for question in questions]) # type: ignore

async def query_one(question):
response = ResponseType(name=None, detailed_answer=None, decision=None)

for i in range(n_retry):
try:
response: ResponseType = await self.diff_query_engine.query_engine.aquery(question[:-1]) # type: ignore
nodes_to_update = [response.source_nodes[int(i)] for i in response.response.used_chunks] # type: ignore
self.diff_query_engine.update_query_engine(nodes_to_update)

except Exception as e:
print(e)
if verbose:
print(f"Error with question: {question}")
print("Retry ...")
if i == 2:
print(
f"{n_retry} repeted errors with the same question, deleting the question."
)
break
continue
break
return {
"decision": (
response.decision.value if response.decision else response.decision
),
"name": response.name,
"detailed_answer": response.detailed_answer,
}

try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
analysis = loop.run_until_complete(query_all(self.questions))
self.generated_df = pd.DataFrame(analysis)
return self.generated_df
finally:
if loop.is_running():
loop.close()
async def query_all(questions: list[str] | None, n_retry: int = 3):
return await asyncio.gather(*[self.query_one(question, n_retry=n_retry) for question in questions]) # type: ignore

analysis = await query_all(self.questions, n_retry=n_retry)
self.generated_df = pd.DataFrame(analysis)
return self.generated_df

def get_detail(self, category, iteration, name):
return self.generated_df.loc[
Expand Down
4 changes: 0 additions & 4 deletions backend/modules/assistant/ito/difference/query_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from enum import Enum
from typing import List, Optional

import nest_asyncio
import uvloop
from langchain_core.pydantic_v1 import BaseModel, Field
from llama_index.core import Document, QueryBundle, VectorStoreIndex
Expand Down Expand Up @@ -33,7 +32,6 @@ class Ingredient(BaseModel):
used_chunks: list[int] = Field(
description="List of chunks id used to generate the answer, None if the response was not found in the chunks. - focus on those with high information density that directly answer the query. Never select a table of content or any text similar."
)
# more_info: str = Field(description="More information on how the chunks (with ID) were used to generate the answer, start with look into chunk id to ..., then look into chunk id ..., keep it short")


class AddChunkId(BaseNodePostprocessor):
Expand Down Expand Up @@ -79,8 +77,6 @@ def __init__(self, source_md: List[str], top_k: int = 15):

def get_query_engine(self, source_md: List[str]) -> None:
"""Get a query engine for markdown files."""
if not isinstance(asyncio.get_event_loop(), uvloop.Loop):
nest_asyncio.apply()

md_document = [Document(text=sub_source_md) for sub_source_md in source_md]
node_parser = MarkdownNodeParser()
Expand Down
48 changes: 36 additions & 12 deletions backend/modules/assistant/ito/difference_assistant.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import os
import tempfile
import traceback
from concurrent.futures import ThreadPoolExecutor
from typing import List

import nest_asyncio
import httpx
import pandas as pd
import uvloop
from fastapi import UploadFile
from logger import get_logger
from megaparse.Converter import MegaParse
Expand All @@ -21,16 +21,31 @@
)
from modules.assistant.ito.difference.difference_agent import ContextType
from modules.assistant.ito.ito import ITO
from modules.upload.service import upload_file

# from modules.upload.controller.upload_routes import upload_file #FIXME circular import
from modules.user.entity.user_identity import UserIdentity

from .difference import DifferenceAgent, DiffQueryEngine

if not isinstance(asyncio.get_event_loop(), uvloop.Loop):
nest_asyncio.apply()
logger = get_logger(__name__)


logger = get_logger(__name__)
# FIXME: PATCHER -> find another solution
async def upload_file_to_api(upload_file, brain_id, current_user):
url = "http://localhost:5050/upload"
headers = {
"Authorization": f"Bearer {current_user.token}",
"Content-Type": "application/json",
}
data = {
"uploadFile": upload_file,
"brain_id": brain_id,
"chat_id": None,
}
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=data)
response.raise_for_status() # Raise an error for 4xx/5xx responses
return response.json()


class DifferenceAssistant(ITO):
Expand Down Expand Up @@ -117,13 +132,23 @@ async def process_assistant(self):
print(target_doc_name, "\n")
print(ref_doc_name, "\n")

def run_megaparse():
megaparse = MegaParse(
file_path=document_1_tmp.name,
llama_parse_api_key=os.getenv("LLAMA_CLOUD_API_KEY"),
)
return megaparse.convert(gpt4o_cleaner=True)

# breakpoint()
try:
megaparse = MegaParse(
file_path=document_1_tmp.name,
llama_parse_api_key=os.getenv("LLAMA_CLOUD_API_KEY"),
)
ref_doc_md = megaparse.convert(gpt4o_cleaner=True)
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
ref_doc_md = await loop.run_in_executor(executor, run_megaparse) # type: ignore
# ref_doc_md = megaparse.convert(gpt4o_cleaner=True)
except Exception as e:
print(e)
print(traceback.format_exc())
Expand Down Expand Up @@ -164,7 +189,7 @@ async def process_assistant(self):

diff_df = pd.DataFrame()

diff_df = agent.run(additional_context=additional_context)
diff_df = await agent.run(additional_context=additional_context)

diff_df = diff_df.dropna()
print("\nNice, the process has succeeded, giving back the json ;)\n")
Expand All @@ -186,7 +211,7 @@ async def process_assistant(self):
await self.send_output_by_email(
file_to_upload,
new_filename,
"Summary",
"Difference",
f"{file_description} of {original_filename}",
brain_id=(
self.input.outputs.brain.value
Expand All @@ -201,11 +226,10 @@ async def process_assistant(self):

# Upload the file if required
if self.input.outputs.brain.activated:
await upload_file(
uploadFile=file_to_upload,
response = await upload_file_to_api(
upload_file=file_to_upload,
brain_id=self.input.outputs.brain.value,
current_user=self.current_user,
chat_id=None,
)

return {"message": f"{file_description} generated successfully"}
Expand Down
25 changes: 21 additions & 4 deletions backend/modules/assistant/ito/summary.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import tempfile
from typing import List

import httpx
from fastapi import UploadFile
from langchain.chains import (
MapReduceDocumentsChain,
Expand All @@ -23,12 +24,29 @@
Outputs,
)
from modules.assistant.ito.ito import ITO
from modules.upload.service import upload_file
from modules.user.entity.user_identity import UserIdentity

logger = get_logger(__name__)


# FIXME: PATCHER -> find another solution
async def upload_file_to_api(upload_file, brain_id, current_user):
url = "http://localhost:5050/upload"
headers = {
"Authorization": f"Bearer {current_user.token}",
"Content-Type": "application/json",
}
data = {
"uploadFile": upload_file,
"brain_id": brain_id,
"chat_id": None,
}
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=data)
response.raise_for_status() # Raise an error for 4xx/5xx responses
return response.json()


class SummaryAssistant(ITO):

def __init__(
Expand Down Expand Up @@ -188,11 +206,10 @@ async def process_assistant(self):

# Upload the file if required
if self.input.outputs.brain.activated:
await upload_file(
uploadFile=file_to_upload,
response = await upload_file_to_api(
upload_file=file_to_upload,
brain_id=self.input.outputs.brain.value,
current_user=self.current_user,
chat_id=None,
)

return {"message": f"{file_description} generated successfully"}
Expand Down
4 changes: 0 additions & 4 deletions backend/packages/files/parsers/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import tempfile
import time

import nest_asyncio
import tiktoken
import uvloop
from langchain.schema import Document
Expand All @@ -15,9 +14,6 @@
from modules.upload.service.upload_file import DocumentSerializable
from packages.embeddings.vectors import Neurons

if not isinstance(asyncio.get_event_loop(), uvloop.Loop):
nest_asyncio.apply()

logger = get_logger(__name__)


Expand Down

0 comments on commit eee9332

Please sign in to comment.