Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/summary-async-celery #2561

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/celery_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

celery = Celery(__name__)


if CELERY_BROKER_URL.startswith("sqs"):
broker_transport_options = {
CELERY_BROKER_QUEUE_NAME: {
Expand Down Expand Up @@ -37,5 +38,4 @@
else:
raise ValueError(f"Unsupported broker URL: {CELERY_BROKER_URL}")


celery.autodiscover_tasks(["modules.sync", "modules", "middlewares", "packages"])
celery.autodiscover_tasks(["modules.sync", "modules","modules.assistant.ito", "middlewares", "packages"])
89 changes: 59 additions & 30 deletions backend/modules/assistant/ito/ito.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from modules.assistant.ito.utils.pdf_generator import PDFGenerator, PDFModel
from modules.chat.controller.chat.utils import update_user_usage
from modules.contact_support.controller.settings import ContactsSettings
from modules.notification.dto.inputs import NotificationUpdatableProperties
from modules.notification.entity.notification import NotificationsStatusEnum
from modules.notification.service.notification_service import NotificationService
from modules.upload.controller.upload_routes import upload_file
from modules.user.entity.user_identity import UserIdentity
from modules.user.service.user_usage import UserUsage
Expand All @@ -22,6 +25,8 @@

logger = get_logger(__name__)

notification_service = NotificationService()


class ITO(BaseModel):
input: InputAssistant
Expand Down Expand Up @@ -62,31 +67,36 @@ def increase_usage_user(self):
def calculate_pricing(self):
return 20

def generate_pdf(self, filename: str, title: str, content: str):
pdf_model = PDFModel(title=title, content=content)
pdf = PDFGenerator(pdf_model)
pdf.print_pdf()
pdf.output(filename, "F")

@abstractmethod
async def process_assistant(self):
pass


async def uploadfile_to_file(uploadFile: UploadFile):
# Transform the UploadFile object to a file object with same name and content
tmp_file = NamedTemporaryFile(delete=False)
tmp_file.write(uploadFile.file.read())
tmp_file.flush() # Make sure all data is written to disk
return tmp_file


class OutputHandler(BaseModel):
async def send_output_by_email(
self,
file: UploadFile,
filename: str,
file: UploadFile,
task_name: str,
custom_message: str,
brain_id: str = None,
user_email: str = None,
):
settings = ContactsSettings()
file = await self.uploadfile_to_file(file)
file = await uploadfile_to_file(file)
domain_quivr = os.getenv("QUIVR_DOMAIN", "https://chat.quivr.app/")

with open(file.name, "rb") as f:
mail_from = settings.resend_contact_sales_from
mail_to = self.current_user.email
mail_to = user_email
body = f"""
<div style="text-align: center;">
<img src="https://quivr-cms.s3.eu-west-3.amazonaws.com/logo_quivr_white_7e3c72620f.png" alt="Quivr Logo" style="width: 100px; height: 100px; border-radius: 50%; margin: 0 auto; display: block;">
Expand Down Expand Up @@ -116,20 +126,35 @@ async def send_output_by_email(
"subject": "Quivr Ingestion Processed",
"reply_to": "[email protected]",
"html": body,
"attachments": [{"filename": filename, "content": list(f.read())}],
"attachments": [
{
"filename": filename,
"content": list(f.read()),
"type": "application/pdf",
}
],
}
logger.info(f"Sending email to {mail_to} with file {filename}")
send_email(params)

async def uploadfile_to_file(self, uploadFile: UploadFile):
# Transform the UploadFile object to a file object with same name and content
tmp_file = NamedTemporaryFile(delete=False)
tmp_file.write(uploadFile.file.read())
tmp_file.flush() # Make sure all data is written to disk
return tmp_file
def generate_pdf(self, filename: str, title: str, content: str):
pdf_model = PDFModel(title=title, content=content)
pdf = PDFGenerator(pdf_model)
pdf.print_pdf()
pdf.output(filename, "F")

async def create_and_upload_processed_file(
self, processed_content: str, original_filename: str, file_description: str
self,
processed_content: str,
original_filename: str,
file_description: str,
content: str,
task_name: str,
custom_message: str,
brain_id: str = None,
email_activated: bool = False,
current_user: UserIdentity = None,
notification_id: str = None,
) -> dict:
"""Handles creation and uploading of the processed file."""
# remove any special characters from the filename that aren't http safe
Expand Down Expand Up @@ -164,32 +189,36 @@ async def create_and_upload_processed_file(
headers={"content-type": "application/pdf"},
)

if self.input.outputs.email.activated:
logger.info(f"current_user: {current_user}")
if email_activated:
await self.send_output_by_email(
file_to_upload,
new_filename,
file_to_upload,
"Summary",
f"{file_description} of {original_filename}",
brain_id=(
self.input.outputs.brain.value
if (
self.input.outputs.brain.activated
and self.input.outputs.brain.value
)
else None
),
brain_id=brain_id,
user_email=current_user["email"],
)

# Reset to start of file before upload
file_to_upload.file.seek(0)
if self.input.outputs.brain.activated:
UserIdentity(**current_user)
if brain_id:
await upload_file(
uploadFile=file_to_upload,
brain_id=self.input.outputs.brain.value,
current_user=self.current_user,
brain_id=brain_id,
current_user=current_user,
chat_id=None,
)

os.remove(new_filename)

notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.SUCCESS,
description=f"Summary of {original_filename} generated successfully",
),
)

return {"message": f"{file_description} generated successfully"}
190 changes: 108 additions & 82 deletions backend/modules/assistant/ito/summary.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import tempfile
from typing import List

from celery_config import celery
from fastapi import UploadFile
from langchain.chains import (
MapReduceDocumentsChain,
Expand All @@ -23,9 +24,12 @@
Outputs,
)
from modules.assistant.ito.ito import ITO
from modules.notification.dto.inputs import CreateNotification
from modules.notification.service.notification_service import NotificationService
from modules.user.entity.user_identity import UserIdentity

logger = get_logger(__name__)
notification_service = NotificationService()


class SummaryAssistant(ITO):
Expand Down Expand Up @@ -69,97 +73,119 @@ def check_input(self):
return True

async def process_assistant(self):

try:
self.increase_usage_user()
except Exception as e:
logger.error(f"Error increasing usage: {e}")
return {"error": str(e)}

# Create a temporary file with the uploaded file as a temporary file and then pass it to the loader
tmp_file = tempfile.NamedTemporaryFile(delete=False)

# Write the file to the temporary file
tmp_file.write(self.files[0].file.read())

# Now pass the path of the temporary file to the loader

loader = UnstructuredPDFLoader(tmp_file.name)

tmp_file.close()

data = loader.load()

llm = ChatLiteLLM(model="gpt-4o", max_tokens=2000)

map_template = """The following is a document that has been divided into multiple sections:
{docs}

Please carefully analyze each section and identify the following:

1. Main Themes: What are the overarching ideas or topics in this section?
2. Key Points: What are the most important facts, arguments, or ideas presented in this section?
3. Important Information: Are there any crucial details that stand out? This could include data, quotes, specific events, entity, or other relevant information.
4. People: Who are the key individuals mentioned in this section? What roles do they play?
5. Reasoning: What logic or arguments are used to support the key points?
6. Chapters: If the document is divided into chapters, what is the main focus of each chapter?

Remember to consider the language and context of the document. This will help in understanding the nuances and subtleties of the text."""
map_prompt = PromptTemplate.from_template(map_template)
map_chain = LLMChain(llm=llm, prompt=map_prompt)
notification = notification_service.add_notification(
CreateNotification(
user_id=self.current_user.id,
status="info",
title=f"Creating Summary for {self.files[0].filename}",
)
)
# Create a temporary file with the uploaded file as a temporary file and then pass it to the loader
tmp_file = tempfile.NamedTemporaryFile(delete=False)

# Reduce
reduce_template = """The following is a set of summaries for parts of the document:
{docs}
Take these and distill it into a final, consolidated summary of the document. Make sure to include the main themes, key points, and important information such as data, quotes,people and specific events.
Use markdown such as bold, italics, underlined. For example, **bold**, *italics*, and _underlined_ to highlight key points.
Please provide the final summary with sections using bold headers.
Sections should always be Summary and Key Points, but feel free to add more sections as needed.
Always use bold text for the sections headers.
Keep the same language as the documents.
Answer:"""
reduce_prompt = PromptTemplate.from_template(reduce_template)
# Write the file to the temporary file
tmp_file.write(self.files[0].file.read())

# Run chain
reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt)
# Now pass the path of the temporary file to the loader

# Takes a list of documents, combines them into a single string, and passes this to an LLMChain
combine_documents_chain = StuffDocumentsChain(
llm_chain=reduce_chain, document_variable_name="docs"
)
loader = UnstructuredPDFLoader(tmp_file.name)

# Combines and iteratively reduces the mapped documents
reduce_documents_chain = ReduceDocumentsChain(
# This is final chain that is called.
combine_documents_chain=combine_documents_chain,
# If documents exceed context for `StuffDocumentsChain`
collapse_documents_chain=combine_documents_chain,
# The maximum number of tokens to group documents into.
token_max=4000,
)
tmp_file.close()

# Combining documents by mapping a chain over them, then combining results
map_reduce_chain = MapReduceDocumentsChain(
# Map chain
llm_chain=map_chain,
# Reduce chain
reduce_documents_chain=reduce_documents_chain,
# The variable name in the llm_chain to put the documents in
document_variable_name="docs",
# Return the results of the map steps in the output
return_intermediate_steps=False,
)
data = loader.load()

text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
chunk_size=1000, chunk_overlap=100
)
split_docs = text_splitter.split_documents(data)
text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
chunk_size=1000, chunk_overlap=100
)
split_docs = text_splitter.split_documents(data)
logger.info(f"Split {len(split_docs)} documents")
# Jsonify the split docs
split_docs = [doc.to_json() for doc in split_docs]
## Turn this into a task
brain_id = (
self.input.outputs.brain.value
if self.input.outputs.brain.activated
else None
)
email_activated = self.input.outputs.email.activated
celery.send_task(
name="task_summary",
args=(
split_docs,
self.files[0].filename,
brain_id,
email_activated,
self.current_user.model_dump(mode="json"),
notification.id,
),
)
except Exception as e:
logger.error(f"Error processing summary: {e}")


def map_reduce_chain():
llm = ChatLiteLLM(model="gpt-4o", max_tokens=2000)

map_template = """The following is a document that has been divided into multiple sections:
{docs}

Please carefully analyze each section and identify the following:

1. Main Themes: What are the overarching ideas or topics in this section?
2. Key Points: What are the most important facts, arguments, or ideas presented in this section?
3. Important Information: Are there any crucial details that stand out? This could include data, quotes, specific events, entity, or other relevant information.
4. People: Who are the key individuals mentioned in this section? What roles do they play?
5. Reasoning: What logic or arguments are used to support the key points?
6. Chapters: If the document is divided into chapters, what is the main focus of each chapter?

Remember to consider the language and context of the document. This will help in understanding the nuances and subtleties of the text."""
map_prompt = PromptTemplate.from_template(map_template)
map_chain = LLMChain(llm=llm, prompt=map_prompt)

# Reduce
reduce_template = """The following is a set of summaries for parts of the document :
{docs}
Take these and distill it into a final, consolidated summary of the document. Make sure to include the main themes, key points, and important information such as data, quotes,people and specific events.
Use markdown such as bold, italics, underlined. For example, **bold**, *italics*, and _underlined_ to highlight key points.
Please provide the final summary with sections using bold headers.
Sections should always be Summary and Key Points, but feel free to add more sections as needed.
Always use bold text for the sections headers.
Keep the same language as the documents.
Answer:"""
reduce_prompt = PromptTemplate.from_template(reduce_template)

# Run chain
llm = ChatLiteLLM(model="gpt-4o", max_tokens=2000)
reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt)

# Takes a list of documents, combines them into a single string, and passes this to an LLMChain
combine_documents_chain = StuffDocumentsChain(
llm_chain=reduce_chain, document_variable_name="docs"
)

content = map_reduce_chain.run(split_docs)
# Combines and iteratively reduces the mapped documents
reduce_documents_chain = ReduceDocumentsChain(
# This is final chain that is called.
combine_documents_chain=combine_documents_chain,
# If documents exceed context for `StuffDocumentsChain`
collapse_documents_chain=combine_documents_chain,
# The maximum number of tokens to group documents into.
token_max=4000,
)

return await self.create_and_upload_processed_file(
content, self.files[0].filename, "Summary"
)
# Combining documents by mapping a chain over them, then combining results
map_reduce_chain = MapReduceDocumentsChain(
# Map chain
llm_chain=map_chain,
# Reduce chain
reduce_documents_chain=reduce_documents_chain,
# The variable name in the llm_chain to put the documents in
document_variable_name="docs",
# Return the results of the map steps in the output
return_intermediate_steps=False,
)
return map_reduce_chain


def summary_inputs():
Expand Down
Loading
Loading