-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
297 lines (237 loc) · 9.25 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
from typing import Optional, List
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
from enum import Enum
import json
import os
from openai import OpenAI
import secrets
import base64
import redis
from dotenv import load_dotenv
import re
load_dotenv()
client = OpenAI()
r = redis.Redis(
host='redis-13375.c292.ap-southeast-1-1.ec2.redns.redis-cloud.com',
port=13375,
password=os.getenv('REDIS_PASSWORD'),
decode_responses=True
)
def generate_model_random_key(length=10):
random_bytes = secrets.token_bytes(length)
random_base64 = base64.b64encode(random_bytes).decode('utf-8')[:length]
return re.sub(r'[\\/]', '', random_base64)
class Message(BaseModel):
id: str
parent_id: Optional[str] = None
role: str
content: str
class messageToFrontend(BaseModel):
id: str
parent_id: Optional[str] = None
role: str
text: str
class openAIMessages(BaseModel):
role: str
content: str
class Choices(Enum):
user = 'user'
system = 'system'
assistant = 'assistant'
class DataValue(BaseModel):
type: Choices
messages: List[Message]
parent: Optional[str] = None
children: List[str] = []
class Config:
use_enum_values = True
class MessageRequest(BaseModel):
username: str
old_id: str
query: str
class ReturnValue(BaseModel):
id: str
parent_id: str
role: str
text: str
class GetHistory(BaseModel):
username: str
id: str
class SiblingRequest(BaseModel):
username: str
user_query_id: str
class RemoveUser(BaseModel):
username: str
GPT_3 = "gpt-3.5-turbo"
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def convert_to_openai_messages(messages: List[Message]) -> List[openAIMessages]:
return [openAIMessages(role=msg.role, content=msg.content) for msg in messages]
default_message: Message = Message(id="0", parent_id=None, role="system", content="You are a helpful AI assistant whose job it is to give the user new ideas. Always give one idea and one idea only. Never give more than one idea")
default_json_value = DataValue(type=Choices.system, messages=[default_message], parent=None, children=[])
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.post("/send-message")
async def message(req: MessageRequest):
print("got data!")
username = req.username
old_id = req.old_id
query = req.query
parent_key = f'{username}/{old_id}'
print(username, old_id, query, parent_key)
parent_data_raw = r.get(parent_key)
if old_id != '0' and not parent_data_raw:
raise HTTPException(500, detail="Man something went wrong!")
elif old_id == '0' and not parent_data_raw:
r.set(parent_key, json.dumps(default_json_value.model_dump()))
parent_data_raw = r.get(parent_key)
parent_data = DataValue(**json.loads(parent_data_raw))
print("parent_data is", parent_data)
parent_messages = parent_data.messages
userRandomValue = generate_model_random_key()
new_message = Message(id=userRandomValue, parent_id=old_id, role="user", content=query)
print("the parent id is", new_message.parent_id)
# Make user block
user_messages = parent_messages + [new_message]
user_parent = parent_key
userBlock = DataValue(type=Choices.user, messages=user_messages, parent=user_parent, children=[])
userKey = f'{username}/{userRandomValue}'
# set user_key as child of parent
parent_data.children.append(userKey)
print("user block key is", userKey)
# make GPTresponse block
new_messages = user_messages
print("the type is", type(new_messages[0]))
send_to_openAI_format = list(convert_to_openai_messages(new_messages))
chat = client.chat.completions.create(model=GPT_3, messages=send_to_openAI_format)
model_reply = chat.choices[0].message.content
model_random_key = generate_model_random_key()
new_messages.append(Message(id=model_random_key, parent_id=userRandomValue, role="assistant", content=model_reply))
model_response_block = DataValue(type=Choices.assistant, messages=new_messages, parent=userKey, children=[])
model_response_key = f'{username}/{model_random_key}'
print("model block key is", model_response_key)
userBlock.children.append(model_response_key)
print(model_response_block)
r.set(model_response_key, json.dumps(model_response_block.model_dump()))
r.set(userKey, json.dumps(userBlock.model_dump()))
r.set(parent_key, json.dumps(parent_data.model_dump()))
print("\n\n userBlock is", userBlock)
send_to_user = ReturnValue(id=model_random_key, parent_id=userRandomValue, role="assistant", text=model_reply)
return send_to_user.model_dump()
def convert_to_message_to_frontend(messages: List[Message]) -> List[messageToFrontend]:
return [messageToFrontend(id=msg.id, parent_id=msg.parent_id, role=msg.role, text=msg.content) for msg in messages]
@app.post("/get-history")
async def history(input: GetHistory):
print("running get history")
username = input.username
id = input.id
key = f'{username}/{id}'
key_exists = bool(r.exists(key))
print(key_exists)
print("username is", username)
print("id is", id)
if key_exists:
print("found key!")
value_raw = r.get(key)
formatted_value = DataValue(**json.loads(value_raw))
messages_list = formatted_value.messages
converted_messages = convert_to_message_to_frontend(messages_list)
output = {
'Data': converted_messages
}
return output
else:
raise HTTPException(status_code=404, detail="Key not found")
async def get_content(key: str) -> str:
value_raw = r.get(key)
formatted_value = DataValue(**json.loads(value_raw))
return (formatted_value.messages[-1].content)
def ordinal(n: int) -> str:
if 11 <= n % 100 <= 13:
suffix = 'th'
else:
suffix = {1: 'st', 2: 'nd', 3: 'rd'}.get(n % 10, 'th')
return str(n) + suffix
async def get_messages(key: str) -> List[Message]:
value_raw = r.get(key)
formatted_value = DataValue(**json.loads(value_raw))
return (formatted_value.messages)
@app.post('/make-sibling')
async def makeSibling(input: SiblingRequest):
print("HI")
username = input.username
user_query_id = input.user_query_id
key = f'{username}/{user_query_id}'
key_exists = bool(r.exists(key))
if not key_exists:
raise HTTPException(status_code=404, detail="Key not found")
value_raw = r.get(key)
formatted_value = DataValue(**json.loads(value_raw))
children = formatted_value.children
previous_answers = ""
for i in range(len(children)):
ans = await get_content(children[i])
to_add = "The " + str(ordinal(i+1)) + " response is " + ans + "\n"
previous_answers += to_add
print("PREVIOUS ANSWERS: \n \n", previous_answers)
question = await get_content(key)
prompt = "The user is asking " + question + ". " + "The previous responses are " + previous_answers + "Give different ideas than the ones above"
print("prompt to openai is", prompt)
old_messages = await get_messages(key)
prev_messages = convert_to_openai_messages(old_messages)
messages_to_send_to_OpenAI = prev_messages + [openAIMessages(role="user", content=prompt)]
chat = client.chat.completions.create(model=GPT_3, messages=messages_to_send_to_OpenAI)
model_reply = chat.choices[0].message.content
# making model response data
model_response_id = generate_model_random_key()
GPT_response_key = f'{username}/{model_response_id}'
formatted_value.children.append(GPT_response_key)
model_response_message = Message(id=model_response_id, parent_id=user_query_id, role="assistant", content=model_reply)
final_messages = old_messages + [model_response_message]
# make new GPT response block
model_response_block = DataValue(type=Choices.assistant, messages=final_messages, parent=key, children=[])
r.set(key, json.dumps(formatted_value.model_dump()))
r.set(GPT_response_key, json.dumps(model_response_block.model_dump()))
print("\n\n\n")
print(model_response_block)
return ReturnValue(id=model_response_id, parent_id=user_query_id, role="assistant", text=model_reply)
@app.post('/check-children')
async def checkChildren(input: GetHistory):
username = input.username
id = input.id
key = f'{username}/{id}'
key_exists = bool(r.exists(key))
print(key_exists)
print("username is", username)
print("id is", id)
if key_exists:
print("found key!")
value_raw = r.get(key)
formatted_value = DataValue(**json.loads(value_raw))
ans = len(formatted_value.children) > 0
output = {
"exists": ans
}
return output
else:
raise HTTPException(404, "No such key!")
@app.post('/remove-user')
async def removeUser(input: RemoveUser):
username = input.username
print("wanting to remove", username)
pattern = f"{username}/*"
cursor = '0'
while cursor != 0:
cursor, keys = r.scan(cursor=cursor, match=pattern, count=1000)
if keys:
r.delete(*keys)
print("deleted all keys with ", username)