-
Notifications
You must be signed in to change notification settings - Fork 18
/
main.py
150 lines (120 loc) · 4.69 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import os
from dotenv import find_dotenv, load_dotenv
from fastapi import FastAPI, HTTPException
from langchain.schema import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from models import DocumentModel, DocumentResponse
from store import AsnyPgVector
from store_factory import get_vector_store
load_dotenv(find_dotenv())
app = FastAPI()
def get_env_variable(var_name: str) -> str:
value = os.getenv(var_name)
if value is None:
raise ValueError(f"Environment variable '{var_name}' not found.")
return value
try:
USE_ASYNC = os.getenv("USE_ASYNC", "False").lower() == "true"
if USE_ASYNC:
print("Async project used")
POSTGRES_DB = get_env_variable("POSTGRES_DB")
POSTGRES_USER = get_env_variable("POSTGRES_USER")
POSTGRES_PASSWORD = get_env_variable("POSTGRES_PASSWORD")
DB_HOST = get_env_variable("DB_HOST")
DB_PORT = get_env_variable("DB_PORT")
CONNECTION_STRING = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{DB_HOST}:{DB_PORT}/{POSTGRES_DB}"
OPENAI_API_KEY = get_env_variable("OPENAI_API_KEY")
embeddings = OpenAIEmbeddings()
mode = "async" if USE_ASYNC else "sync"
pgvector_store = get_vector_store(
connection_string=CONNECTION_STRING,
embeddings=embeddings,
collection_name="testcollection",
mode=mode,
)
retriever = pgvector_store.as_retriever()
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
model = ChatOpenAI(model_name="gpt-3.5-turbo")
chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| model
| StrOutputParser()
)
except ValueError as e:
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/add-documents/")
async def add_documents(documents: list[DocumentModel]):
try:
docs = [
Document(
page_content=doc.page_content,
metadata=(
{**doc.metadata, "digest": doc.generate_digest()}
if doc.metadata
else {"digest": doc.generate_digest()}
),
)
for doc in documents
]
ids = (
await pgvector_store.aadd_documents(docs)
if isinstance(pgvector_store, AsnyPgVector)
else pgvector_store.add_documents(docs)
)
return {"message": "Documents added successfully", "ids": ids}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/get-all-ids/")
async def get_all_ids():
try:
if isinstance(pgvector_store, AsnyPgVector):
ids = await pgvector_store.get_all_ids()
else:
ids = pgvector_store.get_all_ids()
return ids
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/get-documents-by-ids/", response_model=list[DocumentResponse])
async def get_documents_by_ids(ids: list[str]):
try:
if isinstance(pgvector_store, AsnyPgVector):
existing_ids = await pgvector_store.get_all_ids()
documents = await pgvector_store.get_documents_by_ids(ids)
else:
existing_ids = pgvector_store.get_all_ids()
documents = pgvector_store.get_documents_by_ids(ids)
if not all(id in existing_ids for id in ids):
raise HTTPException(status_code=404, detail="One or more IDs not found")
return documents
except HTTPException as http_exc:
raise http_exc
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/delete-documents/")
async def delete_documents(ids: list[str]):
try:
if isinstance(pgvector_store, AsnyPgVector):
existing_ids = await pgvector_store.get_all_ids()
await pgvector_store.delete(ids=ids)
else:
existing_ids = pgvector_store.get_all_ids()
pgvector_store.delete(ids=ids)
if not all(id in existing_ids for id in ids):
raise HTTPException(status_code=404, detail="One or more IDs not found")
return {"message": f"{len(ids)} documents deleted successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/chat/")
async def quick_response(msg: str):
result = chain.invoke(msg)
return result