From b4792f7d52f4d549d2e6ea779afda6db16e49fa1 Mon Sep 17 00:00:00 2001 From: star-nox Date: Thu, 21 Mar 2024 18:15:32 -0500 Subject: [PATCH 01/11] updated create_doc_map() --- ai_ta_backend/beam/nomic_logging.py | 150 ++++++++++++++-------------- 1 file changed, 75 insertions(+), 75 deletions(-) diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index ce8235a2..fab28c97 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -433,9 +433,9 @@ def create_document_map(course_name: str): try: # check if map exists - response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() - if response.data: - return "Map already exists for this course." + # response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() + # if response.data: + # return "Map already exists for this course." # fetch relevant document data from Supabase response = supabase_client.table("documents").select("id", @@ -449,85 +449,83 @@ def create_document_map(course_name: str): print("Total number of documents in Supabase: ", total_doc_count) # minimum 20 docs needed to create map + if total_doc_count < 20: + return "Cannot create a map because there are less than 20 documents in the course." - if total_doc_count > 19: - first_id = response.data[0]['id'] - combined_dfs = [] - curr_total_doc_count = 0 - doc_count = 0 - first_batch = True + first_id = response.data[0]['id'] + combined_dfs = [] + curr_total_doc_count = 0 + doc_count = 0 + first_batch = True - # iteratively query in batches of 25 - while curr_total_doc_count < total_doc_count: + # iteratively query in batches of 25 + while curr_total_doc_count < total_doc_count: - response = supabase_client.table("documents").select( - "id, created_at, s3_path, url, readable_filename, contexts").eq("course_name", course_name).gte( + response = supabase_client.table("documents").select( + "id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gte( 'id', first_id).order('id', desc=False).limit(25).execute() - df = pd.DataFrame(response.data) - combined_dfs.append(df) # list of dfs - - curr_total_doc_count += len(response.data) - doc_count += len(response.data) - + df = pd.DataFrame(response.data) + combined_dfs.append(df) # list of dfs - if doc_count >= 1000: # upload to Nomic every 1000 docs + curr_total_doc_count += len(response.data) + doc_count += len(response.data) - # concat all dfs from the combined_dfs list - final_df = pd.concat(combined_dfs, ignore_index=True) - # prep data for nomic upload - embeddings, metadata = data_prep_for_doc_map(final_df) - - if first_batch: - # create a new map - print("Creating new map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text"] - result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - # update flag - first_batch = False - - else: - # append to existing map - print("Appending data to existing map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name - # add project lock logic here - result = append_to_map(embeddings, metadata, project_name) + if doc_count >= 100: # upload to Nomic in batches of 1000 + # concat all dfs from the combined_dfs list + final_df = pd.concat(combined_dfs, ignore_index=True) - # reset variables - combined_dfs = [] - doc_count = 0 + # prep data for nomic upload + embeddings, metadata = data_prep_for_doc_map(final_df) - # set first_id for next iteration - first_id = response.data[-1]['id'] + 1 + if first_batch: + # create a new map + print("Creating new map...") + project_name = NOMIC_MAP_NAME_PREFIX + course_name + index_name = course_name + "_doc_index" + topic_label_field = "text" + colorable_fields = ["readable_filename", "text", "base_url", "created_at"] + result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) + # update flag + first_batch = False - - # upload last set of docs - final_df = pd.concat(combined_dfs, ignore_index=True) - embeddings, metadata = data_prep_for_doc_map(final_df) - project_name = NOMIC_MAP_NAME_PREFIX + course_name - if first_batch: - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text"] - result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - else: - result = append_to_map(embeddings, metadata, project_name) - print("Atlas upload status: ", result) - - # log info to supabase - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - project.rebuild_maps() - project_info = {'course_name': course_name, 'doc_map_id': project_id} - response = supabase_client.table("projects").insert(project_info).execute() - print("Response from supabase: ", response) - return "success" + else: + # append to existing map + print("Appending data to existing map...") + project_name = NOMIC_MAP_NAME_PREFIX + course_name + # add project lock logic here + result = append_to_map(embeddings, metadata, project_name) + + # reset variables + combined_dfs = [] + doc_count = 0 + + # set first_id for next iteration + first_id = response.data[-1]['id'] + 1 + + # upload last set of docs + final_df = pd.concat(combined_dfs, ignore_index=True) + embeddings, metadata = data_prep_for_doc_map(final_df) + project_name = NOMIC_MAP_NAME_PREFIX + course_name + if first_batch: + index_name = course_name + "_doc_index" + topic_label_field = "text" + colorable_fields = ["readable_filename", "text", "base_url", "created_at"] + result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) else: - return "Cannot create a map because there are less than 20 documents in the course." + result = append_to_map(embeddings, metadata, project_name) + print("Atlas upload status: ", result) + + # log info to supabase + project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_id = project.id + project.rebuild_maps() + project_info = {'course_name': course_name, 'doc_map_id': project_id} + response = supabase_client.table("projects").insert(project_info).execute() + print("Response from supabase: ", response) + return "success" + except Exception as e: print(e) sentry_sdk.capture_exception(e) @@ -535,6 +533,7 @@ def create_document_map(course_name: str): return "failed" + def delete_from_document_map(course_name: str, ids: list): """ This function is used to delete datapoints from a document map. @@ -644,11 +643,11 @@ def create_map(embeddings, metadata, map_name, index_name, topic_label_field, co data=metadata, id_field="id", build_topic_model=True, - name=map_name, topic_label_field=topic_label_field, + name=map_name, colorable_fields=colorable_fields, add_datums_if_exists=True) - project.create_index(index_name, build_topic_model=True) + project.create_index(name=index_name, build_topic_model=True) return "success" except Exception as e: print(e) @@ -673,7 +672,6 @@ def append_to_map(embeddings, metadata, map_name): print(e) return "Error in appending to map: {e}" - def data_prep_for_doc_map(df: pd.DataFrame): """ This function prepares embeddings and metadata for nomic upload in document map creation. @@ -692,6 +690,7 @@ def data_prep_for_doc_map(df: pd.DataFrame): for index, row in df.iterrows(): current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") if row['url'] == None: row['url'] = "" # iterate through all contexts and create separate entries for each @@ -703,11 +702,12 @@ def data_prep_for_doc_map(df: pd.DataFrame): meta_row = { "id": str(row['id']) + "_" + str(context_count), - "doc_ingested_at": row['created_at'], + "created_at": created_at, "s3_path": row['s3_path'], "url": row['url'], + "base_url": row['base_url'], "readable_filename": row['readable_filename'], - "created_at": current_time, + "modified_at": current_time, "text": text_row } From a3578d97efa2aa127e62fe98812ea9cb4e7f548e Mon Sep 17 00:00:00 2001 From: star-nox Date: Fri, 22 Mar 2024 15:29:45 -0500 Subject: [PATCH 02/11] modified create_doc_map() to track last uploaded ids --- ai_ta_backend/beam/nomic_logging.py | 95 ++++++++++++++++++----------- 1 file changed, 59 insertions(+), 36 deletions(-) diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index fab28c97..0d38cf4b 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -433,9 +433,9 @@ def create_document_map(course_name: str): try: # check if map exists - # response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() - # if response.data: - # return "Map already exists for this course." + response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() + if response.data: + return "Map already exists for this course." # fetch relevant document data from Supabase response = supabase_client.table("documents").select("id", @@ -470,8 +470,7 @@ def create_document_map(course_name: str): curr_total_doc_count += len(response.data) doc_count += len(response.data) - - if doc_count >= 100: # upload to Nomic in batches of 1000 + if doc_count >= 1000: # upload to Nomic in batches of 1000 # concat all dfs from the combined_dfs list final_df = pd.concat(combined_dfs, ignore_index=True) @@ -489,6 +488,13 @@ def create_document_map(course_name: str): result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) # update flag first_batch = False + # log project info to supabas + project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_id = project.id + last_id = final_df['id'].iloc[-1].split("_")[0] + project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} + response = supabase_client.table("projects").insert(project_info).execute() + print("Response from supabase: ", response) else: # append to existing map @@ -496,7 +502,13 @@ def create_document_map(course_name: str): project_name = NOMIC_MAP_NAME_PREFIX + course_name # add project lock logic here result = append_to_map(embeddings, metadata, project_name) - + if result == "success": + # update the last uploaded id in supabase + last_id = final_df['id'].iloc[-1].split("_")[0] + project_info = {'last_uploaded_doc_id': last_id} + response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute() + print("Response from supabase: ", response) + # reset variables combined_dfs = [] doc_count = 0 @@ -505,35 +517,33 @@ def create_document_map(course_name: str): first_id = response.data[-1]['id'] + 1 # upload last set of docs - final_df = pd.concat(combined_dfs, ignore_index=True) - embeddings, metadata = data_prep_for_doc_map(final_df) - project_name = NOMIC_MAP_NAME_PREFIX + course_name - if first_batch: - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text", "base_url", "created_at"] - result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - else: - result = append_to_map(embeddings, metadata, project_name) - print("Atlas upload status: ", result) - - # log info to supabase - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - project.rebuild_maps() - project_info = {'course_name': course_name, 'doc_map_id': project_id} - response = supabase_client.table("projects").insert(project_info).execute() - print("Response from supabase: ", response) - return "success" + if doc_count > 0: + final_df = pd.concat(combined_dfs, ignore_index=True) + embeddings, metadata = data_prep_for_doc_map(final_df) + project_name = NOMIC_MAP_NAME_PREFIX + course_name + if first_batch: + index_name = course_name + "_doc_index" + topic_label_field = "text" + colorable_fields = ["readable_filename", "text", "base_url", "created_at"] + result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) + else: + result = append_to_map(embeddings, metadata, project_name) + if result == "success": + # update the last uploaded id in supabase + last_id = final_df['id'].iloc[-1].split("_")[0] + project_info = {'last_uploaded_doc_id': last_id} + response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute() + print("Response from supabase: ", response) + print("Atlas upload status: ", result) + + # rebuild the map + rebuild_map(course_name, "document") except Exception as e: print(e) sentry_sdk.capture_exception(e) - return "failed" - - def delete_from_document_map(course_name: str, ids: list): """ This function is used to delete datapoints from a document map. @@ -559,7 +569,7 @@ def delete_from_document_map(course_name: str, ids: list): print("Deleting point from document map:", project.delete_data(ids)) with project.wait_for_project_lock(): project.rebuild_maps() - return "Successfully deleted from Nomic map" + return "success" except Exception as e: print(e) sentry_sdk.capture_exception(e) @@ -578,9 +588,10 @@ def log_to_document_map(data: dict): try: # check if map exists course_name = data['course_name'] - response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() + response = SUPABASE_CLIENT.table("projects").select("doc_map_id, last_uploaded_doc_id").eq("course_name", course_name).execute() if response.data: project_id = response.data[0]['doc_map_id'] + last_uploaded_doc_id = response.data[0]['last_uploaded_doc_id'] else: # create a map map_creation_result = create_document_map(course_name) @@ -588,11 +599,22 @@ def log_to_document_map(data: dict): return "The project has less than 20 documents and a map cannot be created." else: # fetch project id - response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() + response = SUPABASE_CLIENT.table("projects").select("doc_map_id, last_uploaded_doc_id").eq("course_name", course_name).execute() project_id = response.data[0]['doc_map_id'] + last_uploaded_doc_id = response.data[0]['last_uploaded_doc_id'] project = AtlasProject(project_id=project_id, add_datums_if_exists=True) #print("Inserted data: ", data) + # check if project is locked, if yes -> skip logging + if not project.is_accepting_data: + return "Skipping Nomic logging because project is locked." + else: + # fetch all records from supabase greater than last_uploaded_doc_id + response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", last_uploaded_doc_id).execute() + df = pd.DataFrame(response.data) + + + embeddings = [] metadata = [] @@ -603,11 +625,12 @@ def log_to_document_map(data: dict): embeddings.append(row['embedding']) metadata.append({ "id": str(data['id']) + "_" + str(context_count), - "doc_ingested_at": data['created_at'], + "created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "s3_path": data['s3_path'], "url": data['url'], + "base_url": data['base_url'], "readable_filename": data['readable_filename'], - "created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "modified_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "text": row['text'] }) embeddings = np.array(embeddings) @@ -667,7 +690,7 @@ def append_to_map(embeddings, metadata, map_name): project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True) with project.wait_for_project_lock(): project.add_embeddings(embeddings=embeddings, data=metadata) - return "Successfully appended to Nomic map" + return "success" except Exception as e: print(e) return "Error in appending to map: {e}" @@ -750,7 +773,7 @@ def rebuild_map(course_name:str, map_type:str): with project.wait_for_project_lock(): project.rebuild_maps() - return "Successfully rebuilt map" + return "success" except Exception as e: print(e) sentry_sdk.capture_exception(e) From 7d83c7508d5fefc755718b681f7538f9c6c1534a Mon Sep 17 00:00:00 2001 From: star-nox Date: Fri, 22 Mar 2024 18:09:15 -0500 Subject: [PATCH 03/11] updated logging function to include missed records --- ai_ta_backend/beam/nomic_logging.py | 141 +++++++++++++++++----------- 1 file changed, 87 insertions(+), 54 deletions(-) diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index 0d38cf4b..9ba01bdf 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -453,6 +453,7 @@ def create_document_map(course_name: str): return "Cannot create a map because there are less than 20 documents in the course." first_id = response.data[0]['id'] + combined_dfs = [] curr_total_doc_count = 0 doc_count = 0 @@ -460,7 +461,7 @@ def create_document_map(course_name: str): # iteratively query in batches of 25 while curr_total_doc_count < total_doc_count: - + response = supabase_client.table("documents").select( "id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gte( 'id', first_id).order('id', desc=False).limit(25).execute() @@ -486,15 +487,18 @@ def create_document_map(course_name: str): topic_label_field = "text" colorable_fields = ["readable_filename", "text", "base_url", "created_at"] result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - # update flag - first_batch = False - # log project info to supabas - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - last_id = final_df['id'].iloc[-1].split("_")[0] - project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} - response = supabase_client.table("projects").insert(project_info).execute() - print("Response from supabase: ", response) + + if result == "success": + # update flag + first_batch = False + # log project info to supabase + project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_id = project.id + print("Last id: ", final_df['id'].iloc[-1]) + last_id = int(final_df['id'].iloc[-1]) + project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} + update_response = supabase_client.table("projects").insert(project_info).execute() + print("Response from supabase: ", update_response) else: # append to existing map @@ -504,14 +508,16 @@ def create_document_map(course_name: str): result = append_to_map(embeddings, metadata, project_name) if result == "success": # update the last uploaded id in supabase - last_id = final_df['id'].iloc[-1].split("_")[0] - project_info = {'last_uploaded_doc_id': last_id} - response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute() - print("Response from supabase: ", response) + last_id = int(final_df['id'].iloc[-1]) + info = {'last_uploaded_doc_id': last_id} + print("info:", info) + update_response = supabase_client.table("projects").update(info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) # reset variables combined_dfs = [] doc_count = 0 + print("Records uploaded: ", curr_total_doc_count) # set first_id for next iteration first_id = response.data[-1]['id'] + 1 @@ -528,12 +534,15 @@ def create_document_map(course_name: str): result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) else: result = append_to_map(embeddings, metadata, project_name) + + # update the last uploaded id in supabase if result == "success": # update the last uploaded id in supabase - last_id = final_df['id'].iloc[-1].split("_")[0] + last_id = int(final_df['id'].iloc[-1]) project_info = {'last_uploaded_doc_id': last_id} - response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute() - print("Response from supabase: ", response) + print("project_info: ", project_info) + update_response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) print("Atlas upload status: ", result) # rebuild the map @@ -576,7 +585,7 @@ def delete_from_document_map(course_name: str, ids: list): return "Error in deleting from document map: {e}" -def log_to_document_map(data: dict): +def log_to_document_map(course_name: str): """ This is a function which appends new documents to an existing document map. It's called at the end of split_and_upload() after inserting data to Supabase. @@ -586,8 +595,7 @@ def log_to_document_map(data: dict): print("in add_to_document_map()") try: - # check if map exists - course_name = data['course_name'] + # check if map exists response = SUPABASE_CLIENT.table("projects").select("doc_map_id, last_uploaded_doc_id").eq("course_name", course_name).execute() if response.data: project_id = response.data[0]['doc_map_id'] @@ -604,51 +612,76 @@ def log_to_document_map(data: dict): last_uploaded_doc_id = response.data[0]['last_uploaded_doc_id'] project = AtlasProject(project_id=project_id, add_datums_if_exists=True) - #print("Inserted data: ", data) + project_name = "Document Map for " + course_name # check if project is locked, if yes -> skip logging if not project.is_accepting_data: return "Skipping Nomic logging because project is locked." - else: + + # fetch count of records greater than last_uploaded_doc_id + print("last uploaded doc id: ", last_uploaded_doc_id) + response = SUPABASE_CLIENT.table("documents").select("id", count="exact").eq("course_name", course_name).gt("id", last_uploaded_doc_id).execute() + print("Number of new documents: ", response.count) + + total_doc_count = response.count + current_doc_count = 0 + combined_dfs = [] + + while current_doc_count < total_doc_count: # fetch all records from supabase greater than last_uploaded_doc_id - response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", last_uploaded_doc_id).execute() + response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", last_uploaded_doc_id).limit(25).execute() df = pd.DataFrame(response.data) - - + combined_dfs.append(df) # list of dfs + current_doc_count += len(response.data) + doc_count += len(response.data) - embeddings = [] - metadata = [] - context_count = 0 - # prep data for nomic upload - for row in data['contexts']: - context_count += 1 - embeddings.append(row['embedding']) - metadata.append({ - "id": str(data['id']) + "_" + str(context_count), - "created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "s3_path": data['s3_path'], - "url": data['url'], - "base_url": data['base_url'], - "readable_filename": data['readable_filename'], - "modified_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "text": row['text'] - }) - embeddings = np.array(embeddings) - metadata = pd.DataFrame(metadata) - print("Shape of embeddings: ", embeddings.shape) - - # append to existing map - project_name = "Document Map for " + course_name - result = append_to_map(embeddings, metadata, project_name) + if doc_count >= 1000: # upload to Nomic in batches of 1000 + # concat all dfs from the combined_dfs list + final_df = pd.concat(combined_dfs, ignore_index=True) + # prep data for nomic upload + embeddings, metadata = data_prep_for_doc_map(final_df) - return result + # append to existing map + print("Appending data to existing map...") + + result = append_to_map(embeddings, metadata, project_name) + if result == "success": + # update the last uploaded id in supabase + last_id = int(final_df['id'].iloc[-1]) + info = {'last_uploaded_doc_id': last_id} + print("info:", info) + update_response = SUPABASE_CLIENT.table("projects").update(info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) + + # reset variables + combined_dfs = [] + doc_count = 0 + print("Records uploaded: ", curr_total_doc_count) + + # set first_id for next iteration + first_id = response.data[-1]['id'] + 1 + + # upload last set of docs + if doc_count > 0: + final_df = pd.concat(combined_dfs, ignore_index=True) + embeddings, metadata = data_prep_for_doc_map(final_df) + result = append_to_map(embeddings, metadata, project_name) + # update the last uploaded id in supabase + if result == "success": + # update the last uploaded id in supabase + last_id = int(final_df['id'].iloc[-1]) + project_info = {'last_uploaded_doc_id': last_id} + print("project_info: ", project_info) + update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) + + return "success" except Exception as e: print(e) - sentry_sdk.capture_exception(e) - return "Error in appending to map: {e}" - - + return "failed" + + def create_map(embeddings, metadata, map_name, index_name, topic_label_field, colorable_fields): """ Generic function to create a Nomic map from given parameters. From 1f69a39532acfa5c81889a3be9b9c98b4163ef3f Mon Sep 17 00:00:00 2001 From: star-nox Date: Mon, 25 Mar 2024 12:33:19 -0500 Subject: [PATCH 04/11] minor corrections in log function --- ai_ta_backend/beam/nomic_logging.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index 9ba01bdf..6d325738 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -494,7 +494,6 @@ def create_document_map(course_name: str): # log project info to supabase project = AtlasProject(name=project_name, add_datums_if_exists=True) project_id = project.id - print("Last id: ", final_df['id'].iloc[-1]) last_id = int(final_df['id'].iloc[-1]) project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} update_response = supabase_client.table("projects").insert(project_info).execute() @@ -510,7 +509,6 @@ def create_document_map(course_name: str): # update the last uploaded id in supabase last_id = int(final_df['id'].iloc[-1]) info = {'last_uploaded_doc_id': last_id} - print("info:", info) update_response = supabase_client.table("projects").update(info).eq("course_name", course_name).execute() print("Response from supabase: ", update_response) @@ -543,7 +541,7 @@ def create_document_map(course_name: str): print("project_info: ", project_info) update_response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute() print("Response from supabase: ", update_response) - print("Atlas upload status: ", result) + # rebuild the map rebuild_map(course_name, "document") @@ -625,10 +623,11 @@ def log_to_document_map(course_name: str): total_doc_count = response.count current_doc_count = 0 combined_dfs = [] - + doc_count = 0 + first_id = last_uploaded_doc_id while current_doc_count < total_doc_count: # fetch all records from supabase greater than last_uploaded_doc_id - response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", last_uploaded_doc_id).limit(25).execute() + response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", first_id).limit(25).execute() df = pd.DataFrame(response.data) combined_dfs.append(df) # list of dfs @@ -649,14 +648,13 @@ def log_to_document_map(course_name: str): # update the last uploaded id in supabase last_id = int(final_df['id'].iloc[-1]) info = {'last_uploaded_doc_id': last_id} - print("info:", info) update_response = SUPABASE_CLIENT.table("projects").update(info).eq("course_name", course_name).execute() print("Response from supabase: ", update_response) # reset variables combined_dfs = [] doc_count = 0 - print("Records uploaded: ", curr_total_doc_count) + print("Records uploaded: ", current_doc_count) # set first_id for next iteration first_id = response.data[-1]['id'] + 1 @@ -672,7 +670,6 @@ def log_to_document_map(course_name: str): # update the last uploaded id in supabase last_id = int(final_df['id'].iloc[-1]) project_info = {'last_uploaded_doc_id': last_id} - print("project_info: ", project_info) update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() print("Response from supabase: ", update_response) From 015fd75e0956674de2048becf4880542b950a104 Mon Sep 17 00:00:00 2001 From: star-nox Date: Mon, 25 Mar 2024 12:35:18 -0500 Subject: [PATCH 05/11] removed convo log code from beam folder --- ai_ta_backend/beam/nomic_logging.py | 391 ---------------------------- 1 file changed, 391 deletions(-) diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index 6d325738..0f278f4a 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -1,9 +1,6 @@ import datetime -import json import os -import time -import backoff import nomic import numpy as np import pandas as pd @@ -18,394 +15,6 @@ supabase_url=os.getenv('SUPABASE_URL'), # type: ignore supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore -LOCK_EXCEPTIONS = [ - 'Project is locked for state access! Please wait until the project is unlocked to access embeddings.', - 'Project is locked for state access! Please wait until the project is unlocked to access data.', - 'Project is currently indexing and cannot ingest new datums. Try again later.' -] - - -def giveup_hdlr(e): - """ - Function to handle giveup conditions in backoff decorator - Args: - e: Exception raised by the decorated function - Returns: - True if we want to stop retrying, False otherwise - """ - (e_args,) = e.args - e_str = e_args['exception'] - - print("giveup_hdlr() called with exception:", e_str) - if e_str in LOCK_EXCEPTIONS: - return False - else: - sentry_sdk.capture_exception(e) - return True - - -def backoff_hdlr(details): - """ - Function to handle backup conditions in backoff decorator. - Currently just prints the details of the backoff. - """ - print( - "\nBacking off {wait:0.1f} seconds after {tries} tries, calling function {target} with args {args} and kwargs {kwargs}" - .format(**details)) - - -def backoff_strategy(): - """ - Function to define retry strategy. Is usualy defined in the decorator, - but passing parameters to it is giving errors. - """ - return backoff.expo(base=10, factor=1.5) - - -@backoff.on_exception(backoff_strategy, - Exception, - max_tries=5, - raise_on_giveup=False, - giveup=giveup_hdlr, - on_backoff=backoff_hdlr) -def log_convo_to_nomic(course_name: str, conversation) -> str: - nomic.login(os.getenv('NOMIC_API_KEY')) # login during start of flask app - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - """ - Logs conversation to Nomic. - 1. Check if map exists for given course - 2. Check if conversation ID exists - - if yes, delete and add new data point - - if no, add new data point - 3. Keep current logic for map doesn't exist - update metadata - """ - - print(f"in log_convo_to_nomic() for course: {course_name}") - print("type of conversation:", type(conversation)) - #conversation = json.loads(conversation) - messages = conversation['conversation']['messages'] - if 'user_email' not in conversation['conversation']: - user_email = "NULL" - else: - user_email = conversation['conversation']['user_email'] - conversation_id = conversation['conversation']['id'] - - # we have to upload whole conversations - # check what the fetched data looks like - pandas df or pyarrow table - # check if conversation ID exists in Nomic, if yes fetch all data from it and delete it. - # will have current QA and historical QA from Nomic, append new data and add_embeddings() - - project_name = NOMIC_MAP_NAME_PREFIX + course_name - start_time = time.monotonic() - emoji = "" - - try: - # fetch project metadata and embbeddings - project = AtlasProject(name=project_name, add_datums_if_exists=True) - - map_metadata_df = project.maps[1].data.df # type: ignore - map_embeddings_df = project.maps[1].embeddings.latent - # create a function which returns project, data and embeddings df here - map_metadata_df['id'] = map_metadata_df['id'].astype(int) - last_id = map_metadata_df['id'].max() - - if conversation_id in map_metadata_df.values: - # store that convo metadata locally - prev_data = map_metadata_df[map_metadata_df['conversation_id'] == conversation_id] - prev_index = prev_data.index.values[0] - embeddings = map_embeddings_df[prev_index - 1].reshape(1, 1536) - prev_convo = prev_data['conversation'].values[0] - prev_id = prev_data['id'].values[0] - created_at = pd.to_datetime(prev_data['created_at'].values[0]).strftime('%Y-%m-%d %H:%M:%S') - - # delete that convo data point from Nomic, and print result - print("Deleting point from nomic:", project.delete_data([str(prev_id)])) - - # prep for new point - first_message = prev_convo.split("\n")[1].split(": ")[1] - - # select the last 2 messages and append new convo to prev convo - messages_to_be_logged = messages[-2:] - for message in messages_to_be_logged: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - - prev_convo += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # modified timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - # update metadata - metadata = [{ - "course": course_name, - "conversation": prev_convo, - "conversation_id": conversation_id, - "id": last_id + 1, - "user_email": user_email, - "first_query": first_message, - "created_at": created_at, - "modified_at": current_time - }] - else: - print("conversation_id does not exist") - - # add new data point - user_queries = [] - conversation_string = "" - - first_message = messages[0]['content'] - if isinstance(first_message, list): - first_message = first_message[0]['text'] - user_queries.append(first_message) - - for message in messages: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - - conversation_string += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # modified timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - metadata = [{ - "course": course_name, - "conversation": conversation_string, - "conversation_id": conversation_id, - "id": last_id + 1, - "user_email": user_email, - "first_query": first_message, - "created_at": current_time, - "modified_at": current_time - }] - - # create embeddings - embeddings_model = OpenAIEmbeddings(openai_api_type=OPENAI_API_TYPE) # type: ignore - embeddings = embeddings_model.embed_documents(user_queries) - - # add embeddings to the project - create a new function for this - project = atlas.AtlasProject(name=project_name, add_datums_if_exists=True) - with project.wait_for_project_lock(): - project.add_embeddings(embeddings=np.array(embeddings), data=pd.DataFrame(metadata)) - project.rebuild_maps() - - print(f"⏰ Nomic logging runtime: {(time.monotonic() - start_time):.2f} seconds") - return f"Successfully logged for {course_name}" - - except Exception as e: - if str(e) == 'You must specify a unique_id_field when creating a new project.': - print("Attempting to create Nomic map...") - result = create_nomic_map(course_name, conversation) - print("result of create_nomic_map():", result) - else: - # raising exception again to trigger backoff and passing parameters to use in create_nomic_map() - raise Exception({"exception": str(e)}) - - - -def get_nomic_map(course_name: str, type: str): - """ - Returns the variables necessary to construct an iframe of the Nomic map given a course name. - We just need the ID and URL. - Example values: - map link: https://atlas.nomic.ai/map/ed222613-97d9-46a9-8755-12bbc8a06e3a/f4967ad7-ff37-4098-ad06-7e1e1a93dd93 - map id: f4967ad7-ff37-4098-ad06-7e1e1a93dd93 - """ - nomic.login(os.getenv('NOMIC_API_KEY')) # login during start of flask app - if type.lower() == 'document': - NOMIC_MAP_NAME_PREFIX = 'Document Map for ' - else: - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - - project_name = NOMIC_MAP_NAME_PREFIX + course_name - start_time = time.monotonic() - - try: - project = atlas.AtlasProject(name=project_name, add_datums_if_exists=True) - map = project.get_map(project_name) - - print(f"⏰ Nomic Full Map Retrieval: {(time.monotonic() - start_time):.2f} seconds") - return {"map_id": f"iframe{map.id}", "map_link": map.map_link} - except Exception as e: - # Error: ValueError: You must specify a unique_id_field when creating a new project. - if str(e) == 'You must specify a unique_id_field when creating a new project.': # type: ignore - print("Nomic map does not exist yet, probably because you have less than 20 queries/documents on your project: ", e) - else: - print("ERROR in get_nomic_map():", e) - sentry_sdk.capture_exception(e) - return {"map_id": None, "map_link": None} - - -def create_nomic_map(course_name: str, log_data: list): - """ - Creates a Nomic map for new courses and those which previously had < 20 queries. - 1. fetches supabase conversations for course - 2. appends current embeddings and metadata to it - 2. creates map if there are at least 20 queries - """ - nomic.login(os.getenv('NOMIC_API_KEY')) # login during start of flask app - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - - print(f"in create_nomic_map() for {course_name}") - # initialize supabase - supabase_client = supabase.create_client( # type: ignore - supabase_url=os.getenv('SUPABASE_URL'), # type: ignore - supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore - - try: - # fetch all conversations with this new course (we expect <=20 conversations, because otherwise the map should be made already) - response = supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).execute() - data = response.data - df = pd.DataFrame(data) - - if len(data) < 19: - return None - else: - # get all queries for course and create metadata - user_queries = [] - metadata = [] - i = 1 - conversation_exists = False - - # current log details - log_messages = log_data['conversation']['messages'] # type: ignore - log_user_email = log_data['conversation']['user_email'] # type: ignore - log_conversation_id = log_data['conversation']['id'] # type: ignore - - for _index, row in df.iterrows(): - user_email = row['user_email'] - created_at = pd.to_datetime(row['created_at']).strftime('%Y-%m-%d %H:%M:%S') - convo = row['convo'] - messages = convo['messages'] - - first_message = messages[0]['content'] - if isinstance(first_message, list): - first_message = first_message[0]['text'] - - user_queries.append(first_message) - - # create metadata for multi-turn conversation - conversation = "" - for message in messages: - # string of role: content, role: content, ... - if message['role'] == 'user': # type: ignore - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - - conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # append current chat to previous chat if convo already exists - if convo['id'] == log_conversation_id: - conversation_exists = True - - for m in log_messages: - if m['role'] == 'user': # type: ignore - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(m['content'], list): - text = m['content'][0]['text'] - else: - text = m['content'] - conversation += "\n>>> " + emoji + m['role'] + ": " + text + "\n" - - # adding modified timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - # add to metadata - metadata_row = { - "course": row['course_name'], - "conversation": conversation, - "conversation_id": convo['id'], - "id": i, - "user_email": user_email, - "first_query": first_message, - "created_at": created_at, - "modified_at": current_time - } - metadata.append(metadata_row) - i += 1 - - # add current log as a new data point if convo doesn't exist - if not conversation_exists: - user_queries.append(log_messages[0]['content']) - conversation = "" - for message in log_messages: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # adding timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - metadata_row = { - "course": course_name, - "conversation": conversation, - "conversation_id": log_conversation_id, - "id": i, - "user_email": log_user_email, - "first_query": log_messages[0]['content'], - "created_at": current_time, - "modified_at": current_time - } - metadata.append(metadata_row) - - metadata = pd.DataFrame(metadata) - embeddings_model = OpenAIEmbeddings(openai_api_type=OPENAI_API_TYPE) # type: ignore - embeddings = embeddings_model.embed_documents(user_queries) - - # create Atlas project - project_name = NOMIC_MAP_NAME_PREFIX + course_name - index_name = course_name + "_convo_index" - project = atlas.map_embeddings( - embeddings=np.array(embeddings), - data=metadata, # type: ignore - this is the correct type, the func signature from Nomic is incomplete - id_field='id', - build_topic_model=True, - topic_label_field='first_query', - name=project_name, - colorable_fields=['conversation_id', 'first_query']) - project.create_index(index_name, build_topic_model=True) - return f"Successfully created Nomic map for {course_name}" - except Exception as e: - # Error: ValueError: You must specify a unique_id_field when creating a new project. - if str(e) == 'You must specify a unique_id_field when creating a new project.': # type: ignore - print("Nomic map does not exist yet, probably because you have less than 20 queries on your project: ", e) - else: - print("ERROR in create_nomic_map():", e) - sentry_sdk.capture_exception(e) - - return "failed" - - - ## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ## def create_document_map(course_name: str): From fc69d4ccf911d074f0be82510bdaff42b6d75b5d Mon Sep 17 00:00:00 2001 From: star-nox Date: Tue, 26 Mar 2024 10:49:06 -0500 Subject: [PATCH 06/11] started updates for convo logging --- ai_ta_backend/beam/nomic_logging.py | 22 +- ai_ta_backend/database/sql.py | 20 +- ai_ta_backend/main.py | 14 + ai_ta_backend/service/nomic_service.py | 436 +++++++++++++++++-------- 4 files changed, 332 insertions(+), 160 deletions(-) diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index 0f278f4a..5d7fff03 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -15,6 +15,8 @@ supabase_url=os.getenv('SUPABASE_URL'), # type: ignore supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore +NOMIC_MAP_NAME_PREFIX = 'Document Map for ' + ## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ## def create_document_map(course_name: str): @@ -33,21 +35,15 @@ def create_document_map(course_name: str): """ print("in create_document_map()") nomic.login(os.getenv('NOMIC_API_KEY')) - NOMIC_MAP_NAME_PREFIX = 'Document Map for ' - - # initialize supabase - supabase_client = supabase.create_client( # type: ignore - supabase_url=os.getenv('SUPABASE_URL'), # type: ignore - supabase_key=os.getenv('SUPABASE_API_KEY')) # type: ignore - + try: # check if map exists - response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() + response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() if response.data: return "Map already exists for this course." # fetch relevant document data from Supabase - response = supabase_client.table("documents").select("id", + response = SUPABASE_CLIENT.table("documents").select("id", count="exact").eq("course_name", course_name).order('id', desc=False).execute() @@ -71,7 +67,7 @@ def create_document_map(course_name: str): # iteratively query in batches of 25 while curr_total_doc_count < total_doc_count: - response = supabase_client.table("documents").select( + response = SUPABASE_CLIENT.table("documents").select( "id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gte( 'id', first_id).order('id', desc=False).limit(25).execute() df = pd.DataFrame(response.data) @@ -105,7 +101,7 @@ def create_document_map(course_name: str): project_id = project.id last_id = int(final_df['id'].iloc[-1]) project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} - update_response = supabase_client.table("projects").insert(project_info).execute() + update_response = SUPABASE_CLIENT.table("projects").insert(project_info).execute() print("Response from supabase: ", update_response) else: @@ -118,7 +114,7 @@ def create_document_map(course_name: str): # update the last uploaded id in supabase last_id = int(final_df['id'].iloc[-1]) info = {'last_uploaded_doc_id': last_id} - update_response = supabase_client.table("projects").update(info).eq("course_name", course_name).execute() + update_response = SUPABASE_CLIENT.table("projects").update(info).eq("course_name", course_name).execute() print("Response from supabase: ", update_response) # reset variables @@ -148,7 +144,7 @@ def create_document_map(course_name: str): last_id = int(final_df['id'].iloc[-1]) project_info = {'last_uploaded_doc_id': last_id} print("project_info: ", project_info) - update_response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute() + update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() print("Response from supabase: ", update_response) diff --git a/ai_ta_backend/database/sql.py b/ai_ta_backend/database/sql.py index 223bc386..e79ecc3b 100644 --- a/ai_ta_backend/database/sql.py +++ b/ai_ta_backend/database/sql.py @@ -73,9 +73,14 @@ def getAllFromTableForDownloadType(self, course_name: str, download_type: str, f return response - def getAllConversationsBetweenIds(self, course_name: str, first_id: int, last_id: int): - return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).gte( - 'id', first_id).lte('id', last_id).order('id', desc=False).limit(25).execute() + def getAllConversationsBetweenIds(self, course_name: str, first_id: int, last_id: int, limit: int = 50): + if last_id == 0: + return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).gte( + 'id', first_id).order('id', desc=False).limit(limit).execute() + else: + return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).gte( + 'id', first_id).lte('id', last_id).order('id', desc=False).limit(limit).execute() + def getDocsForIdsGte(self, course_name: str, first_id: int, fields: str = "*", limit: int = 100): return self.supabase_client.table("documents").select(fields).eq("course_name", course_name).gte( @@ -86,3 +91,12 @@ def insertProjectInfo(self, project_info): def getAllFromLLMConvoMonitor(self, course_name: str): return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).execute() + + def getCountFromLLMConvoMonitor(self, course_name: str): + return self.supabase_client.table("llm-convo-monitor").select("id", count='exact').eq("course_name", course_name).execute() + + def getDocMapFromProjects(self, course_name: str): + return self.supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() + + def getConvoMapFromProjects(self, course_name: str): + return self.supabase_client.table("projects").select("convo_map_id").eq("course_name", course_name).execute() diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index 77bfeea5..cfe9cbf0 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -197,6 +197,20 @@ def createDocumentMap(service: NomicService): response.headers.add('Access-Control-Allow-Origin', '*') return response +@app.route('/createConversationMap', methods=['GET']) +def createConversationMap(service: NomicService): + course_name: str = request.args.get('course_name', default='', type=str) + + if course_name == '': + # proper web error "400 Bad request" + abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") + + map_id = service.create_conversation_map(course_name) + + response = jsonify(map_id) + response.headers.add('Access-Control-Allow-Origin', '*') + return response + @app.route('/onResponseCompletion', methods=['POST']) def logToNomic(service: NomicService, flaskExecutor: ExecutorInterface): diff --git a/ai_ta_backend/service/nomic_service.py b/ai_ta_backend/service/nomic_service.py index aee724fa..cd6c314e 100644 --- a/ai_ta_backend/service/nomic_service.py +++ b/ai_ta_backend/service/nomic_service.py @@ -20,7 +20,6 @@ 'Project is currently indexing and cannot ingest new datums. Try again later.' ] - def giveup_hdlr(e): """ Function to handle giveup conditions in backoff decorator @@ -252,157 +251,309 @@ def get_nomic_map(self, course_name: str, type: str): self.sentry.capture_exception(e) return {"map_id": None, "map_link": None} - def create_nomic_map(self, course_name: str, log_data: list): + # def create_nomic_map(self, course_name: str, log_data: list): + # """ + # Creates a Nomic map for new courses and those which previously had < 20 queries. + # 1. fetches supabase conversations for course + # 2. appends current embeddings and metadata to it + # 2. creates map if there are at least 20 queries + # """ + # nomic.login(os.environ['NOMIC_API_KEY']) # login during start of flask app + # NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' + + # print(f"in create_nomic_map() for {course_name}") + + # try: + # # fetch all conversations with this new course (we expect <=20 conversations, because otherwise the map should be made already) + + # response = self.sql.getAllFromLLMConvoMonitor(course_name) + # data = response.data + # df = pd.DataFrame(data) + + # if len(data) < 19: + # return None + # else: + # # get all queries for course and create metadata + # user_queries = [] + # metadata = [] + # i = 1 + # conversation_exists = False + + # # current log details + # log_messages = log_data['conversation']['messages'] # type: ignore + # log_user_email = log_data['conversation']['user_email'] # type: ignore + # log_conversation_id = log_data['conversation']['id'] # type: ignore + + # for _index, row in df.iterrows(): + # user_email = row['user_email'] + # created_at = pd.to_datetime(row['created_at']).strftime('%Y-%m-%d %H:%M:%S') + # convo = row['convo'] + # messages = convo['messages'] + + # first_message = messages[0]['content'] + # if isinstance(first_message, list): + # first_message = first_message[0]['text'] + + # user_queries.append(first_message) + + # # create metadata for multi-turn conversation + # conversation = "" + # for message in messages: + # # string of role: content, role: content, ... + # if message['role'] == 'user': # type: ignore + # emoji = "🙋 " + # else: + # emoji = "🤖 " + + # if isinstance(message['content'], list): + # text = message['content'][0]['text'] + # else: + # text = message['content'] + + # conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + + # # append current chat to previous chat if convo already exists + # if convo['id'] == log_conversation_id: + # conversation_exists = True + + # for m in log_messages: + # if m['role'] == 'user': # type: ignore + # emoji = "🙋 " + # else: + # emoji = "🤖 " + + # if isinstance(m['content'], list): + # text = m['content'][0]['text'] + # else: + # text = m['content'] + # conversation += "\n>>> " + emoji + m['role'] + ": " + text + "\n" + + # # adding modified timestamp + # current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # # add to metadata + # metadata_row = { + # "course": row['course_name'], + # "conversation": conversation, + # "conversation_id": convo['id'], + # "id": i, + # "user_email": user_email, + # "first_query": first_message, + # "created_at": created_at, + # "modified_at": current_time + # } + # metadata.append(metadata_row) + # i += 1 + + # # add current log as a new data point if convo doesn't exist + # if not conversation_exists: + # user_queries.append(log_messages[0]['content']) + # conversation = "" + # for message in log_messages: + # if message['role'] == 'user': + # emoji = "🙋 " + # else: + # emoji = "🤖 " + + # if isinstance(message['content'], list): + # text = message['content'][0]['text'] + # else: + # text = message['content'] + # conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + + # # adding timestamp + # current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # metadata_row = { + # "course": course_name, + # "conversation": conversation, + # "conversation_id": log_conversation_id, + # "id": i, + # "user_email": log_user_email, + # "first_query": log_messages[0]['content'], + # "created_at": current_time, + # "modified_at": current_time + # } + # metadata.append(metadata_row) + + # metadata = pd.DataFrame(metadata) + # embeddings_model = OpenAIEmbeddings(openai_api_type=os.environ['OPENAI_API_TYPE']) + # embeddings = embeddings_model.embed_documents(user_queries) + + # # create Atlas project + # project_name = NOMIC_MAP_NAME_PREFIX + course_name + # index_name = course_name + "_convo_index" + # project = atlas.map_embeddings( + # embeddings=np.array(embeddings), + # data=metadata, # type: ignore - this is the correct type, the func signature from Nomic is incomplete + # id_field='id', + # build_topic_model=True, + # topic_label_field='first_query', + # name=project_name, + # colorable_fields=['conversation_id', 'first_query']) + # project.create_index(index_name, build_topic_model=True) + # return f"Successfully created Nomic map for {course_name}" + # except Exception as e: + # # Error: ValueError: You must specify a unique_id_field when creating a new project. + # if str(e) == 'You must specify a unique_id_field when creating a new project.': # type: ignore + # print("Nomic map does not exist yet, probably because you have less than 20 queries on your project: ", e) + # else: + # print("ERROR in create_nomic_map():", e) + # self.sentry.capture_exception(e) + + # return "failed" + + def create_conversation_map(self, course_name: str): """ - Creates a Nomic map for new courses and those which previously had < 20 queries. - 1. fetches supabase conversations for course - 2. appends current embeddings and metadata to it - 2. creates map if there are at least 20 queries - """ - nomic.login(os.environ['NOMIC_API_KEY']) # login during start of flask app + This function creates a conversation map for a given course from scratch. + """ + nomic.login(os.getenv('NOMIC_API_KEY')) NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' + # check if map exists + response = self.sql.getConvoMapFromProjects(course_name) - print(f"in create_nomic_map() for {course_name}") + if response.data[0]['convo_map_id']: + return "Map already exists for this course." - try: - # fetch all conversations with this new course (we expect <=20 conversations, because otherwise the map should be made already) + # if no, fetch total count of records + response = self.sql.getCountFromLLMConvoMonitor(course_name) + + # if <20, return message that map cannot be created + if not response.count: + return "No conversations found for this course." + elif response.count < 20: + return "Cannot create a map because there are less than 20 conversations in the course." + + # if >20, iteratively fetch records in batches of 100 + total_convo_count = response.count + print("Total number of conversations in Supabase: ", total_convo_count) + first_id = response.data[0]['id'] + combined_dfs = [] + current_convo_count = 0 + convo_count = 0 + first_batch = True + + # iteratively query in batches of 100 + while current_convo_count < total_convo_count: + response = self.sql.getAllConversationsBetweenIds(course_name, first_id, 0, 50) + df = pd.DataFrame(response.data) + combined_dfs.append(df) + current_convo_count += len(response.data) + convo_count += len(response.data) + + if convo_count >= 5: + # concat all dfs from the combined_dfs list + final_df = pd.concat(combined_dfs, ignore_index=True) - response = self.sql.getAllFromLLMConvoMonitor(course_name) - data = response.data - df = pd.DataFrame(data) + # prep data for nomic upload + embeddings, metadata = self.data_prep_for_convo_map(final_df) - if len(data) < 19: - return None - else: - # get all queries for course and create metadata - user_queries = [] - metadata = [] - i = 1 - conversation_exists = False - - # current log details - log_messages = log_data['conversation']['messages'] # type: ignore - log_user_email = log_data['conversation']['user_email'] # type: ignore - log_conversation_id = log_data['conversation']['id'] # type: ignore - - for _index, row in df.iterrows(): - user_email = row['user_email'] - created_at = pd.to_datetime(row['created_at']).strftime('%Y-%m-%d %H:%M:%S') - convo = row['convo'] - messages = convo['messages'] - - first_message = messages[0]['content'] - if isinstance(first_message, list): - first_message = first_message[0]['text'] - - user_queries.append(first_message) - - # create metadata for multi-turn conversation - conversation = "" - for message in messages: - # string of role: content, role: content, ... - if message['role'] == 'user': # type: ignore - emoji = "🙋 " - else: - emoji = "🤖 " + if first_batch: + # create a new map + print("Creating new map...") + project_name = NOMIC_MAP_NAME_PREFIX + course_name + index_name = course_name + "_convo_index" + topic_label_field = "first_query" + colorable_fields = ["user_email", "first_query", "conversation_id", "created_at"] + result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) + + if result == "success": + # update flag + first_batch = False + # log project info to supabase + project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_id = project.id + last_id = int(final_df['id'].iloc[-1]) + project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} + # if entry already exists, update it + projects_record = self.sql.getConvoMapFromProjects(course_name) + update_response = SUPABASE_CLIENT.table("projects").insert(project_info).execute() + print("Response from supabase: ", update_response) + else: + # append to existing map - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - - conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # append current chat to previous chat if convo already exists - if convo['id'] == log_conversation_id: - conversation_exists = True - - for m in log_messages: - if m['role'] == 'user': # type: ignore - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(m['content'], list): - text = m['content'][0]['text'] - else: - text = m['content'] - conversation += "\n>>> " + emoji + m['role'] + ": " + text + "\n" - - # adding modified timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - # add to metadata - metadata_row = { - "course": row['course_name'], - "conversation": conversation, - "conversation_id": convo['id'], - "id": i, - "user_email": user_email, - "first_query": first_message, - "created_at": created_at, - "modified_at": current_time - } - metadata.append(metadata_row) - i += 1 - - # add current log as a new data point if convo doesn't exist - if not conversation_exists: - user_queries.append(log_messages[0]['content']) - conversation = "" - for message in log_messages: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # adding timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - metadata_row = { - "course": course_name, - "conversation": conversation, - "conversation_id": log_conversation_id, - "id": i, - "user_email": log_user_email, - "first_query": log_messages[0]['content'], - "created_at": current_time, - "modified_at": current_time - } - metadata.append(metadata_row) - - metadata = pd.DataFrame(metadata) - embeddings_model = OpenAIEmbeddings(openai_api_type=os.environ['OPENAI_API_TYPE']) - embeddings = embeddings_model.embed_documents(user_queries) - # create Atlas project - project_name = NOMIC_MAP_NAME_PREFIX + course_name - index_name = course_name + "_convo_index" - project = atlas.map_embeddings( - embeddings=np.array(embeddings), - data=metadata, # type: ignore - this is the correct type, the func signature from Nomic is incomplete - id_field='id', - build_topic_model=True, - topic_label_field='first_query', - name=project_name, - colorable_fields=['conversation_id', 'first_query']) - project.create_index(index_name, build_topic_model=True) - return f"Successfully created Nomic map for {course_name}" - except Exception as e: - # Error: ValueError: You must specify a unique_id_field when creating a new project. - if str(e) == 'You must specify a unique_id_field when creating a new project.': # type: ignore - print("Nomic map does not exist yet, probably because you have less than 20 queries on your project: ", e) + + + return "success" + + def data_prep_for_convo_map(self, df: pd.DataFrame): + """ + This function prepares embeddings and metadata for nomic upload in conversation map creation. + Args: + df: pd.DataFrame - the dataframe of documents from Supabase + Returns: + embeddings: np.array of embeddings + metadata: pd.DataFrame of metadata + """ + print("in data_prep_for_convo_map()") + + metadata = [] + embeddings = [] + texts = [] + + for _index, row in df.iterrows(): + + print("Row: ", row) + + current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") + conversation_exists = False + conversation = "" + emoji = "" + user_queries = [] + + if row['user_email'] is None: + user_email = "" else: - print("ERROR in create_nomic_map():", e) - self.sentry.capture_exception(e) + user_email = row['user_email'] + + messages = row['convo']['messages'] + first_message = messages[0]['content'] + # some conversations include images, so the data structure is different + if isinstance(first_message, list): + first_message = first_message[0]['text'] + user_queries.append(first_message) + + # construct metadata for multi-turn conversation + for message in messages: + if message['role'] == 'user': + emoji = "🙋 " + else: + emoji = "🤖 " + + if isinstance(message['content'], list): + text = message['content'][0]['text'] + else: + text = message['content'] + + conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + + meta_row = { + "course": row['course_name'], + "conversation": conversation, + "conversation_id": row['convo']['id'], + "id": row['id'], + "user_email": user_email, + "first_query": first_message, + "created_at": created_at, + "modified_at": current_time + } + + metadata.append(meta_row) + texts.append(user_queries) + + embeddings_model = OpenAIEmbeddings(openai_api_type="openai", + openai_api_base="https://api.openai.com/v1/", + openai_api_key=os.environ['VLADS_OPENAI_KEY']) + embeddings = embeddings_model.embed_documents(texts) + + metadata = pd.DataFrame(metadata) + embeddings = np.array(embeddings) + + return embeddings, metadata - return "failed" ## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ## @@ -707,9 +858,6 @@ def data_prep_for_doc_map(self, df: pd.DataFrame): # check dimension if embeddings_np is (n, 1536) if len(embeddings_np.shape) < 2: print("Creating new embeddings...") - # embeddings_model = OpenAIEmbeddings(openai_api_type=OPENAI_API_TYPE, - # openai_api_base=os.getenv('AZURE_OPENAI_BASE'), - # openai_api_key=os.getenv('AZURE_OPENAI_KEY')) # type: ignore embeddings_model = OpenAIEmbeddings(openai_api_type="openai", openai_api_base="https://api.openai.com/v1/", openai_api_key=os.environ['VLADS_OPENAI_KEY']) From ea3601762f27ae1a1598da1f2ec0ba1b20d4ff5c Mon Sep 17 00:00:00 2001 From: star-nox Date: Wed, 27 Mar 2024 13:45:27 -0500 Subject: [PATCH 07/11] modified function to create convo map from scratch --- ai_ta_backend/database/sql.py | 6 +- ai_ta_backend/service/nomic_service.py | 1031 +++++++++--------------- 2 files changed, 374 insertions(+), 663 deletions(-) diff --git a/ai_ta_backend/database/sql.py b/ai_ta_backend/database/sql.py index e79ecc3b..ae77e6fe 100644 --- a/ai_ta_backend/database/sql.py +++ b/ai_ta_backend/database/sql.py @@ -75,7 +75,7 @@ def getAllFromTableForDownloadType(self, course_name: str, download_type: str, f def getAllConversationsBetweenIds(self, course_name: str, first_id: int, last_id: int, limit: int = 50): if last_id == 0: - return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).gte( + return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).gt( 'id', first_id).order('id', desc=False).limit(limit).execute() else: return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).gte( @@ -100,3 +100,7 @@ def getDocMapFromProjects(self, course_name: str): def getConvoMapFromProjects(self, course_name: str): return self.supabase_client.table("projects").select("convo_map_id").eq("course_name", course_name).execute() + + def updateProjects(self, course_name: str, data: dict): + return self.supabase_client.table("projects").update(data).eq("course_name", course_name).execute() + diff --git a/ai_ta_backend/service/nomic_service.py b/ai_ta_backend/service/nomic_service.py index cd6c314e..2066468a 100644 --- a/ai_ta_backend/service/nomic_service.py +++ b/ai_ta_backend/service/nomic_service.py @@ -65,157 +65,157 @@ def __init__(self, sentry: SentryService, sql: SQLDatabase): self.sentry = sentry self.sql = sql - @backoff.on_exception(backoff_strategy, - Exception, - max_tries=5, - raise_on_giveup=False, - giveup=giveup_hdlr, - on_backoff=backoff_hdlr) - def log_convo_to_nomic(self, course_name: str, conversation) -> Union[str, None]: - # nomic.login(os.getenv('NOMIC_API_KEY')) # login during start of flask app - NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - """ - Logs conversation to Nomic. - 1. Check if map exists for given course - 2. Check if conversation ID exists - - if yes, delete and add new data point - - if no, add new data point - 3. Keep current logic for map doesn't exist - update metadata - """ - - print(f"in log_convo_to_nomic() for course: {course_name}") - print("type of conversation:", type(conversation)) - #conversation = json.loads(conversation) - messages = conversation['conversation']['messages'] - if 'user_email' not in conversation['conversation']: - user_email = "NULL" - else: - user_email = conversation['conversation']['user_email'] - conversation_id = conversation['conversation']['id'] - - # we have to upload whole conversations - # check what the fetched data looks like - pandas df or pyarrow table - # check if conversation ID exists in Nomic, if yes fetch all data from it and delete it. - # will have current QA and historical QA from Nomic, append new data and add_embeddings() - - project_name = NOMIC_MAP_NAME_PREFIX + course_name - start_time = time.monotonic() - emoji = "" - - try: - # fetch project metadata and embbeddings - project = AtlasProject(name=project_name, add_datums_if_exists=True) - - map_metadata_df = project.maps[1].data.df # type: ignore - map_embeddings_df = project.maps[1].embeddings.latent - # create a function which returns project, data and embeddings df here - map_metadata_df['id'] = map_metadata_df['id'].astype(int) - last_id = map_metadata_df['id'].max() - - if conversation_id in map_metadata_df.values: - # store that convo metadata locally - prev_data = map_metadata_df[map_metadata_df['conversation_id'] == conversation_id] - prev_index = prev_data.index.values[0] - embeddings = map_embeddings_df[prev_index - 1].reshape(1, 1536) - prev_convo = prev_data['conversation'].values[0] - prev_id = prev_data['id'].values[0] - created_at = pd.to_datetime(prev_data['created_at'].values[0]).strftime('%Y-%m-%d %H:%M:%S') - - # delete that convo data point from Nomic, and print result - print("Deleting point from nomic:", project.delete_data([str(prev_id)])) - - # prep for new point - first_message = prev_convo.split("\n")[1].split(": ")[1] - - # select the last 2 messages and append new convo to prev convo - messages_to_be_logged = messages[-2:] - for message in messages_to_be_logged: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - - prev_convo += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # modified timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - # update metadata - metadata = [{ - "course": course_name, - "conversation": prev_convo, - "conversation_id": conversation_id, - "id": last_id + 1, - "user_email": user_email, - "first_query": first_message, - "created_at": created_at, - "modified_at": current_time - }] - else: - print("conversation_id does not exist") - - # add new data point - user_queries = [] - conversation_string = "" - - first_message = messages[0]['content'] - if isinstance(first_message, list): - first_message = first_message[0]['text'] - user_queries.append(first_message) - - for message in messages: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " - - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] + # @backoff.on_exception(backoff_strategy, + # Exception, + # max_tries=5, + # raise_on_giveup=False, + # giveup=giveup_hdlr, + # on_backoff=backoff_hdlr) + # def log_convo_to_nomic(self, course_name: str, conversation) -> Union[str, None]: + # # nomic.login(os.getenv('NOMIC_API_KEY')) # login during start of flask app + # NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' + # """ + # Logs conversation to Nomic. + # 1. Check if map exists for given course + # 2. Check if conversation ID exists + # - if yes, delete and add new data point + # - if no, add new data point + # 3. Keep current logic for map doesn't exist - update metadata + # """ + + # print(f"in log_convo_to_nomic() for course: {course_name}") + # print("type of conversation:", type(conversation)) + # #conversation = json.loads(conversation) + # messages = conversation['conversation']['messages'] + # if 'user_email' not in conversation['conversation']: + # user_email = "NULL" + # else: + # user_email = conversation['conversation']['user_email'] + # conversation_id = conversation['conversation']['id'] + + # # we have to upload whole conversations + # # check what the fetched data looks like - pandas df or pyarrow table + # # check if conversation ID exists in Nomic, if yes fetch all data from it and delete it. + # # will have current QA and historical QA from Nomic, append new data and add_embeddings() + + # project_name = NOMIC_MAP_NAME_PREFIX + course_name + # start_time = time.monotonic() + # emoji = "" - conversation_string += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + # try: + # # fetch project metadata and embbeddings + # project = AtlasProject(name=project_name, add_datums_if_exists=True) + + # map_metadata_df = project.maps[1].data.df # type: ignore + # map_embeddings_df = project.maps[1].embeddings.latent + # # create a function which returns project, data and embeddings df here + # map_metadata_df['id'] = map_metadata_df['id'].astype(int) + # last_id = map_metadata_df['id'].max() + + # if conversation_id in map_metadata_df.values: + # # store that convo metadata locally + # prev_data = map_metadata_df[map_metadata_df['conversation_id'] == conversation_id] + # prev_index = prev_data.index.values[0] + # embeddings = map_embeddings_df[prev_index - 1].reshape(1, 1536) + # prev_convo = prev_data['conversation'].values[0] + # prev_id = prev_data['id'].values[0] + # created_at = pd.to_datetime(prev_data['created_at'].values[0]).strftime('%Y-%m-%d %H:%M:%S') + + # # delete that convo data point from Nomic, and print result + # print("Deleting point from nomic:", project.delete_data([str(prev_id)])) + + # # prep for new point + # first_message = prev_convo.split("\n")[1].split(": ")[1] + + # # select the last 2 messages and append new convo to prev convo + # messages_to_be_logged = messages[-2:] + # for message in messages_to_be_logged: + # if message['role'] == 'user': + # emoji = "🙋 " + # else: + # emoji = "🤖 " + + # if isinstance(message['content'], list): + # text = message['content'][0]['text'] + # else: + # text = message['content'] + + # prev_convo += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + + # # modified timestamp + # current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # # update metadata + # metadata = [{ + # "course": course_name, + # "conversation": prev_convo, + # "conversation_id": conversation_id, + # "id": last_id + 1, + # "user_email": user_email, + # "first_query": first_message, + # "created_at": created_at, + # "modified_at": current_time + # }] + # else: + # print("conversation_id does not exist") - # modified timestamp - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + # # add new data point + # user_queries = [] + # conversation_string = "" + + # first_message = messages[0]['content'] + # if isinstance(first_message, list): + # first_message = first_message[0]['text'] + # user_queries.append(first_message) + + # for message in messages: + # if message['role'] == 'user': + # emoji = "🙋 " + # else: + # emoji = "🤖 " + + # if isinstance(message['content'], list): + # text = message['content'][0]['text'] + # else: + # text = message['content'] + + # conversation_string += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + + # # modified timestamp + # current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # metadata = [{ + # "course": course_name, + # "conversation": conversation_string, + # "conversation_id": conversation_id, + # "id": last_id + 1, + # "user_email": user_email, + # "first_query": first_message, + # "created_at": current_time, + # "modified_at": current_time + # }] + + # # create embeddings + # embeddings_model = OpenAIEmbeddings(openai_api_type=os.environ['OPENAI_API_TYPE']) + # embeddings = embeddings_model.embed_documents(user_queries) - metadata = [{ - "course": course_name, - "conversation": conversation_string, - "conversation_id": conversation_id, - "id": last_id + 1, - "user_email": user_email, - "first_query": first_message, - "created_at": current_time, - "modified_at": current_time - }] + # # add embeddings to the project - create a new function for this + # project = atlas.AtlasProject(name=project_name, add_datums_if_exists=True) + # with project.wait_for_project_lock(): + # project.add_embeddings(embeddings=np.array(embeddings), data=pd.DataFrame(metadata)) + # project.rebuild_maps() - # create embeddings - embeddings_model = OpenAIEmbeddings(openai_api_type=os.environ['OPENAI_API_TYPE']) - embeddings = embeddings_model.embed_documents(user_queries) + # print(f"⏰ Nomic logging runtime: {(time.monotonic() - start_time):.2f} seconds") + # return f"Successfully logged for {course_name}" - # add embeddings to the project - create a new function for this - project = atlas.AtlasProject(name=project_name, add_datums_if_exists=True) - with project.wait_for_project_lock(): - project.add_embeddings(embeddings=np.array(embeddings), data=pd.DataFrame(metadata)) - project.rebuild_maps() - - print(f"⏰ Nomic logging runtime: {(time.monotonic() - start_time):.2f} seconds") - return f"Successfully logged for {course_name}" - - except Exception as e: - if str(e) == 'You must specify a unique_id_field when creating a new project.': - print("Attempting to create Nomic map...") - result = self.create_nomic_map(course_name, conversation) - print("result of create_nomic_map():", result) - else: - # raising exception again to trigger backoff and passing parameters to use in create_nomic_map() - raise Exception({"exception": str(e)}) + # except Exception as e: + # if str(e) == 'You must specify a unique_id_field when creating a new project.': + # print("Attempting to create Nomic map...") + # result = self.create_nomic_map(course_name, conversation) + # print("result of create_nomic_map():", result) + # else: + # # raising exception again to trigger backoff and passing parameters to use in create_nomic_map() + # raise Exception({"exception": str(e)}) def get_nomic_map(self, course_name: str, type: str): """ @@ -251,519 +251,183 @@ def get_nomic_map(self, course_name: str, type: str): self.sentry.capture_exception(e) return {"map_id": None, "map_link": None} - # def create_nomic_map(self, course_name: str, log_data: list): - # """ - # Creates a Nomic map for new courses and those which previously had < 20 queries. - # 1. fetches supabase conversations for course - # 2. appends current embeddings and metadata to it - # 2. creates map if there are at least 20 queries - # """ - # nomic.login(os.environ['NOMIC_API_KEY']) # login during start of flask app - # NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - - # print(f"in create_nomic_map() for {course_name}") - - # try: - # # fetch all conversations with this new course (we expect <=20 conversations, because otherwise the map should be made already) - - # response = self.sql.getAllFromLLMConvoMonitor(course_name) - # data = response.data - # df = pd.DataFrame(data) - - # if len(data) < 19: - # return None - # else: - # # get all queries for course and create metadata - # user_queries = [] - # metadata = [] - # i = 1 - # conversation_exists = False - - # # current log details - # log_messages = log_data['conversation']['messages'] # type: ignore - # log_user_email = log_data['conversation']['user_email'] # type: ignore - # log_conversation_id = log_data['conversation']['id'] # type: ignore - - # for _index, row in df.iterrows(): - # user_email = row['user_email'] - # created_at = pd.to_datetime(row['created_at']).strftime('%Y-%m-%d %H:%M:%S') - # convo = row['convo'] - # messages = convo['messages'] - - # first_message = messages[0]['content'] - # if isinstance(first_message, list): - # first_message = first_message[0]['text'] - - # user_queries.append(first_message) - - # # create metadata for multi-turn conversation - # conversation = "" - # for message in messages: - # # string of role: content, role: content, ... - # if message['role'] == 'user': # type: ignore - # emoji = "🙋 " - # else: - # emoji = "🤖 " - - # if isinstance(message['content'], list): - # text = message['content'][0]['text'] - # else: - # text = message['content'] - - # conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # # append current chat to previous chat if convo already exists - # if convo['id'] == log_conversation_id: - # conversation_exists = True - - # for m in log_messages: - # if m['role'] == 'user': # type: ignore - # emoji = "🙋 " - # else: - # emoji = "🤖 " - - # if isinstance(m['content'], list): - # text = m['content'][0]['text'] - # else: - # text = m['content'] - # conversation += "\n>>> " + emoji + m['role'] + ": " + text + "\n" - - # # adding modified timestamp - # current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - # # add to metadata - # metadata_row = { - # "course": row['course_name'], - # "conversation": conversation, - # "conversation_id": convo['id'], - # "id": i, - # "user_email": user_email, - # "first_query": first_message, - # "created_at": created_at, - # "modified_at": current_time - # } - # metadata.append(metadata_row) - # i += 1 - - # # add current log as a new data point if convo doesn't exist - # if not conversation_exists: - # user_queries.append(log_messages[0]['content']) - # conversation = "" - # for message in log_messages: - # if message['role'] == 'user': - # emoji = "🙋 " - # else: - # emoji = "🤖 " - - # if isinstance(message['content'], list): - # text = message['content'][0]['text'] - # else: - # text = message['content'] - # conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - # # adding timestamp - # current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - # metadata_row = { - # "course": course_name, - # "conversation": conversation, - # "conversation_id": log_conversation_id, - # "id": i, - # "user_email": log_user_email, - # "first_query": log_messages[0]['content'], - # "created_at": current_time, - # "modified_at": current_time - # } - # metadata.append(metadata_row) - - # metadata = pd.DataFrame(metadata) - # embeddings_model = OpenAIEmbeddings(openai_api_type=os.environ['OPENAI_API_TYPE']) - # embeddings = embeddings_model.embed_documents(user_queries) + def log_to_conversation_map(self, course_name: str): + """ + This function logs new conversations to existing nomic maps. + 1. Check if nomic map exists + 2. If no, create it + 3. If yes, fetch all conversations since last upload and log it + """ - # # create Atlas project - # project_name = NOMIC_MAP_NAME_PREFIX + course_name - # index_name = course_name + "_convo_index" - # project = atlas.map_embeddings( - # embeddings=np.array(embeddings), - # data=metadata, # type: ignore - this is the correct type, the func signature from Nomic is incomplete - # id_field='id', - # build_topic_model=True, - # topic_label_field='first_query', - # name=project_name, - # colorable_fields=['conversation_id', 'first_query']) - # project.create_index(index_name, build_topic_model=True) - # return f"Successfully created Nomic map for {course_name}" - # except Exception as e: - # # Error: ValueError: You must specify a unique_id_field when creating a new project. - # if str(e) == 'You must specify a unique_id_field when creating a new project.': # type: ignore - # print("Nomic map does not exist yet, probably because you have less than 20 queries on your project: ", e) - # else: - # print("ERROR in create_nomic_map():", e) - # self.sentry.capture_exception(e) - # return "failed" - def create_conversation_map(self, course_name: str): """ This function creates a conversation map for a given course from scratch. """ nomic.login(os.getenv('NOMIC_API_KEY')) NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' - # check if map exists - response = self.sql.getConvoMapFromProjects(course_name) - - if response.data[0]['convo_map_id']: + try: + # check if map exists + response = self.sql.getConvoMapFromProjects(course_name) + print("Response from supabase: ", response.data) + if response.data[0]['convo_map_id']: return "Map already exists for this course." - # if no, fetch total count of records - response = self.sql.getCountFromLLMConvoMonitor(course_name) + # if no, fetch total count of records + response = self.sql.getCountFromLLMConvoMonitor(course_name) - # if <20, return message that map cannot be created - if not response.count: + # if <20, return message that map cannot be created + if not response.count: return "No conversations found for this course." - elif response.count < 20: + elif response.count < 20: return "Cannot create a map because there are less than 20 conversations in the course." - # if >20, iteratively fetch records in batches of 100 - total_convo_count = response.count - print("Total number of conversations in Supabase: ", total_convo_count) - first_id = response.data[0]['id'] - combined_dfs = [] - current_convo_count = 0 - convo_count = 0 - first_batch = True - - # iteratively query in batches of 100 - while current_convo_count < total_convo_count: - response = self.sql.getAllConversationsBetweenIds(course_name, first_id, 0, 50) - df = pd.DataFrame(response.data) - combined_dfs.append(df) - current_convo_count += len(response.data) - convo_count += len(response.data) - - if convo_count >= 5: - # concat all dfs from the combined_dfs list + # if >20, iteratively fetch records in batches of 100 + total_convo_count = response.count + print("Total number of conversations in Supabase: ", total_convo_count) + + first_id = response.data[0]['id'] - 1 + combined_dfs = [] + current_convo_count = 0 + convo_count = 0 + first_batch = True + project_name = NOMIC_MAP_NAME_PREFIX + course_name + + # iteratively query in batches of 50 + while current_convo_count < total_convo_count: + response = self.sql.getAllConversationsBetweenIds(course_name, first_id, 0, 100) + print("Response count: ", len(response.data)) + if len(response.data) == 0: + break + df = pd.DataFrame(response.data) + combined_dfs.append(df) + current_convo_count += len(response.data) + convo_count += len(response.data) + print(current_convo_count) + + if convo_count >= 500: + # concat all dfs from the combined_dfs list + final_df = pd.concat(combined_dfs, ignore_index=True) + # prep data for nomic upload + embeddings, metadata = self.data_prep_for_convo_map(final_df) + + if first_batch: + # create a new map + print("Creating new map...") + index_name = course_name + "_convo_index" + topic_label_field = "first_query" + colorable_fields = ["user_email", "first_query", "conversation_id", "created_at"] + result = self.create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) + + if result == "success": + # update flag + first_batch = False + # log project info to supabase + project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_id = project.id + last_id = int(final_df['id'].iloc[-1]) + project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} + # if entry already exists, update it + projects_record = self.sql.getConvoMapFromProjects(course_name) + if projects_record.data: + project_response = self.sql.updateProjects(course_name, project_info) + else: + project_response = self.sql.insertProjectInfo(project_info) + print("Update response from supabase: ", project_response) + else: + # append to existing map + print("Appending data to existing map...") + project = AtlasProject(name=project_name, add_datums_if_exists=True) + result = self.append_to_map(embeddings, metadata, project_name) + if result == "success": + last_id = int(final_df['id'].iloc[-1]) + project_response = self.sql.updateProjects(course_name, project_info) + print("Update response from supabase: ", project_response) + + # reset variables + combined_dfs = [] + convo_count = 0 + print("Records uploaded: ", current_convo_count) + + # set first_id for next iteration + try: + print("response: ", response.data[-1]['id']) + except: + print("response: ", response.data) + first_id = response.data[-1]['id'] + 1 + + print("Convo count: ", convo_count) + # upload last set of convos + if convo_count > 0: + print("Uploading last set of conversations...") final_df = pd.concat(combined_dfs, ignore_index=True) - - # prep data for nomic upload embeddings, metadata = self.data_prep_for_convo_map(final_df) - if first_batch: - # create a new map - print("Creating new map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name + # create map index_name = course_name + "_convo_index" topic_label_field = "first_query" colorable_fields = ["user_email", "first_query", "conversation_id", "created_at"] - result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) + result = self.create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - if result == "success": - # update flag - first_batch = False - # log project info to supabase - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - last_id = int(final_df['id'].iloc[-1]) - project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} - # if entry already exists, update it - projects_record = self.sql.getConvoMapFromProjects(course_name) - update_response = SUPABASE_CLIENT.table("projects").insert(project_info).execute() - print("Response from supabase: ", update_response) else: - # append to existing map - - + # append to map + print("in map append") + result = self.append_to_map(embeddings, metadata, project_name) + + if result == "success": + print("last map append successful") + last_id = int(final_df['id'].iloc[-1]) + project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_id = project.id + project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} + print("Project info: ", project_info) + # if entry already exists, update it + projects_record = self.sql.getConvoMapFromProjects(course_name) + if projects_record.data: + project_response = self.sql.updateProjects(course_name, project_info) + else: + project_response = self.sql.insertProjectInfo(project_info) + print("Response from supabase: ", project_response) + + # rebuild the map + self.rebuild_map(course_name, "conversation") + return "success" + except Exception as e: + print(e) + self.sentry.capture_exception(e) + return "Error in creating conversation map:" + str(e) + - return "success" - def data_prep_for_convo_map(self, df: pd.DataFrame): - """ - This function prepares embeddings and metadata for nomic upload in conversation map creation. - Args: - df: pd.DataFrame - the dataframe of documents from Supabase - Returns: - embeddings: np.array of embeddings - metadata: pd.DataFrame of metadata - """ - print("in data_prep_for_convo_map()") - - metadata = [] - embeddings = [] - texts = [] - - for _index, row in df.iterrows(): - - print("Row: ", row) - - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") - conversation_exists = False - conversation = "" - emoji = "" - user_queries = [] - - if row['user_email'] is None: - user_email = "" - else: - user_email = row['user_email'] - - messages = row['convo']['messages'] - first_message = messages[0]['content'] - # some conversations include images, so the data structure is different - if isinstance(first_message, list): - first_message = first_message[0]['text'] - user_queries.append(first_message) - - # construct metadata for multi-turn conversation - for message in messages: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " + ## -------------------------------- SUPPLEMENTARY MAP FUNCTIONS --------------------------------- ## - if isinstance(message['content'], list): - text = message['content'][0]['text'] - else: - text = message['content'] - - conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - - meta_row = { - "course": row['course_name'], - "conversation": conversation, - "conversation_id": row['convo']['id'], - "id": row['id'], - "user_email": user_email, - "first_query": first_message, - "created_at": created_at, - "modified_at": current_time - } - - metadata.append(meta_row) - texts.append(user_queries) - - embeddings_model = OpenAIEmbeddings(openai_api_type="openai", - openai_api_base="https://api.openai.com/v1/", - openai_api_key=os.environ['VLADS_OPENAI_KEY']) - embeddings = embeddings_model.embed_documents(texts) - - metadata = pd.DataFrame(metadata) - embeddings = np.array(embeddings) - - return embeddings, metadata - - - ## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ## - - def create_document_map(self, course_name: str): + def rebuild_map(self, course_name:str, map_type:str): """ - This is a function which creates a document map for a given course from scratch - 1. Gets count of documents for the course - 2. If less than 20, returns a message that a map cannot be created - 3. If greater than 20, iteratively fetches documents in batches of 25 - 4. Prepares metadata and embeddings for nomic upload - 5. Creates a new map and uploads the data - - Args: - course_name: str - Returns: - str: success or failed - """ - print("in create_document_map()") - # nomic.login(os.getenv('NOMIC_API_KEY')) - NOMIC_MAP_NAME_PREFIX = 'Document Map for ' - - try: - # check if map exists - - response = self.sql.getProjectsMapForCourse(course_name) - if response.data: - return "Map already exists for this course." - - # fetch relevant document data from Supabase - response = self.sql.getDocumentsBetweenDates(course_name, '', '', "documents") - - if not response.count: - return "No documents found for this course." - - total_doc_count = response.count - print("Total number of documents in Supabase: ", total_doc_count) - - # minimum 20 docs needed to create map - if total_doc_count > 19: - - first_id = response.data[0]['id'] - combined_dfs = [] - curr_total_doc_count = 0 - doc_count = 0 - first_batch = True - - # iteratively query in batches of 25 - while curr_total_doc_count < total_doc_count: - - response = self.sql.getDocsForIdsGte(course_name, first_id, - "id, created_at, s3_path, url, readable_filename, contexts", 25) - - df = pd.DataFrame(response.data) - combined_dfs.append(df) # list of dfs - - curr_total_doc_count += len(response.data) - doc_count += len(response.data) - - if doc_count >= 1000: # upload to Nomic every 1000 docs - - # concat all dfs from the combined_dfs list - final_df = pd.concat(combined_dfs, ignore_index=True) - - # prep data for nomic upload - embeddings, metadata = self.data_prep_for_doc_map(final_df) - - if first_batch: - # create a new map - print("Creating new map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text"] - result = self.create_map(embeddings, metadata, project_name, index_name, topic_label_field, - colorable_fields) - # update flag - first_batch = False - - else: - # append to existing map - print("Appending data to existing map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name - # add project lock logic here - result = self.append_to_map(embeddings, metadata, project_name) - - # reset variables - combined_dfs = [] - doc_count = 0 - - # set first_id for next iteration - first_id = response.data[-1]['id'] + 1 - - # upload last set of docs - final_df = pd.concat(combined_dfs, ignore_index=True) - embeddings, metadata = self.data_prep_for_doc_map(final_df) - project_name = NOMIC_MAP_NAME_PREFIX + course_name - if first_batch: - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text"] - result = self.create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - else: - result = self.append_to_map(embeddings, metadata, project_name) - print("Atlas upload status: ", result) - - # log info to supabase - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - project.rebuild_maps() - project_info = {'course_name': course_name, 'doc_map_id': project_id} - response = self.sql.insertProjectInfo(project_info) - print("Response from supabase: ", response) - return "success" - else: - return "Cannot create a map because there are less than 20 documents in the course." - except Exception as e: - print(e) - self.sentry.capture_exception(e) - return "failed" - - def delete_from_document_map(self, project_id: str, ids: list): + This function rebuilds a given map in Nomic. """ - This function is used to delete datapoints from a document map. - Currently used within the delete_data() function in vector_database.py - Args: - course_name: str - ids: list of str - """ - print("in delete_from_document_map()") + print("in rebuild_map()") + nomic.login(os.getenv('NOMIC_API_KEY')) + + if map_type.lower() == 'document': + NOMIC_MAP_NAME_PREFIX = 'Document Map for ' + else: + NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' try: # fetch project from Nomic - project = AtlasProject(project_id=project_id, add_datums_if_exists=True) + project_name = NOMIC_MAP_NAME_PREFIX + course_name + project = AtlasProject(name=project_name, add_datums_if_exists=True) - # delete the ids from Nomic - print("Deleting point from document map:", project.delete_data(ids)) - with project.wait_for_project_lock(): + if project.is_accepting_data: project.rebuild_maps() - return "Successfully deleted from Nomic map" + return "success" except Exception as e: print(e) self.sentry.capture_exception(e) - return "Error in deleting from document map: {e}" - - # If this needs to be uncommented, make sure to move the supabase call to the respective service - # def log_to_document_map(self, data: dict): - # """ - # This is a function which appends new documents to an existing document map. It's called - # at the end of split_and_upload() after inserting data to Supabase. - # Args: - # data: dict - the response data from Supabase insertion - # """ - # print("in add_to_document_map()") - - # try: - # # check if map exists - # course_name = data['course_name'] - # response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() - # if response.data: - # project_id = response.data[0]['doc_map_id'] - # else: - # # create a map - # map_creation_result = self.create_document_map(course_name) - # if map_creation_result != "success": - # return "The project has less than 20 documents and a map cannot be created." - # else: - # # fetch project id - # response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() - # project_id = response.data[0]['doc_map_id'] - - # project = AtlasProject(project_id=project_id, add_datums_if_exists=True) - # #print("Inserted data: ", data) - - # embeddings = [] - # metadata = [] - # context_count = 0 - # # prep data for nomic upload - # for row in data['contexts']: - # context_count += 1 - # embeddings.append(row['embedding']) - # metadata.append({ - # "id": str(data['id']) + "_" + str(context_count), - # "doc_ingested_at": data['created_at'], - # "s3_path": data['s3_path'], - # "url": data['url'], - # "readable_filename": data['readable_filename'], - # "created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - # "text": row['text'] - # }) - # embeddings = np.array(embeddings) - # metadata = pd.DataFrame(metadata) - # print("Shape of embeddings: ", embeddings.shape) - - # # append to existing map - # project_name = "Document Map for " + course_name - # result = self.append_to_map(embeddings, metadata, project_name) - - # # check if project is accepting new datums - # if project.is_accepting_data: - # with project.wait_for_project_lock(): - # project.rebuild_maps() - - # # with project.wait_for_project_lock(): - # # project.rebuild_maps() - # return result - - # except Exception as e: - # print(e) - # self.sentry.capture_exception(e) - # return "Error in appending to map: {e}" + return "Error in rebuilding map: {e}" + def create_map(self, embeddings, metadata, map_name, index_name, topic_label_field, colorable_fields): """ @@ -777,7 +441,7 @@ def create_map(self, embeddings, metadata, map_name, index_name, topic_label_fie colorable_fields: list of str """ nomic.login(os.environ['NOMIC_API_KEY']) - + print("in create_map()") try: project = atlas.map_embeddings(embeddings=embeddings, data=metadata, @@ -810,60 +474,103 @@ def append_to_map(self, embeddings, metadata, map_name): except Exception as e: print(e) return "Error in appending to map: {e}" + - def data_prep_for_doc_map(self, df: pd.DataFrame): + def data_prep_for_convo_map(self, df: pd.DataFrame): """ - This function prepares embeddings and metadata for nomic upload in document map creation. + This function prepares embeddings and metadata for nomic upload in conversation map creation. Args: df: pd.DataFrame - the dataframe of documents from Supabase Returns: embeddings: np.array of embeddings metadata: pd.DataFrame of metadata """ - print("in data_prep_for_doc_map()") + print("in data_prep_for_convo_map()") + try: + metadata = [] + embeddings = [] + user_queries = [] - metadata = [] - embeddings = [] - texts = [] + for _index, row in df.iterrows(): + current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") + conversation_exists = False + conversation = "" + emoji = "" - for _index, row in df.iterrows(): + if row['user_email'] is None: + user_email = "" + else: + user_email = row['user_email'] - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - if row['url'] is None: - row['url'] = "" - # iterate through all contexts and create separate entries for each - context_count = 0 - for context in row['contexts']: - context_count += 1 - text_row = context['text'] - embeddings_row = context['embedding'] + messages = row['convo']['messages'] + first_message = messages[0]['content'] + # some conversations include images, so the data structure is different + if isinstance(first_message, list): + first_message = first_message[0]['text'] + user_queries.append(first_message) + + # construct metadata for multi-turn conversation + for message in messages: + if message['role'] == 'user': + emoji = "🙋 " + else: + emoji = "🤖 " + + if isinstance(message['content'], list): + text = message['content'][0]['text'] + else: + text = message['content'] + conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + meta_row = { - "id": str(row['id']) + "_" + str(context_count), - "doc_ingested_at": row['created_at'], - "s3_path": row['s3_path'], - "url": row['url'], - "readable_filename": row['readable_filename'], - "created_at": current_time, - "text": text_row - } - - embeddings.append(embeddings_row) + "course": row['course_name'], + "conversation": conversation, + "conversation_id": row['convo']['id'], + "id": row['id'], + "user_email": user_email, + "first_query": first_message, + "created_at": created_at, + "modified_at": current_time + } + metadata.append(meta_row) - texts.append(text_row) - embeddings_np = np.array(embeddings, dtype=object) - print("Shape of embeddings: ", embeddings_np.shape) - - # check dimension if embeddings_np is (n, 1536) - if len(embeddings_np.shape) < 2: - print("Creating new embeddings...") embeddings_model = OpenAIEmbeddings(openai_api_type="openai", - openai_api_base="https://api.openai.com/v1/", - openai_api_key=os.environ['VLADS_OPENAI_KEY']) - embeddings = embeddings_model.embed_documents(texts) + openai_api_base="https://api.openai.com/v1/", + openai_api_key=os.environ['VLADS_OPENAI_KEY']) + embeddings = embeddings_model.embed_documents(user_queries) + + metadata = pd.DataFrame(metadata) + embeddings = np.array(embeddings) + return embeddings, metadata - metadata = pd.DataFrame(metadata) - embeddings = np.array(embeddings) + except Exception as e: + print("Error in data_prep_for_convo_map():", e) + self.sentry.capture_exception(e) + return None, None - return embeddings, metadata + def delete_from_document_map(self, project_id: str, ids: list): + """ + This function is used to delete datapoints from a document map. + Currently used within the delete_data() function in vector_database.py + Args: + course_name: str + ids: list of str + """ + print("in delete_from_document_map()") + + try: + # fetch project from Nomic + project = AtlasProject(project_id=project_id, add_datums_if_exists=True) + + # delete the ids from Nomic + print("Deleting point from document map:", project.delete_data(ids)) + with project.wait_for_project_lock(): + project.rebuild_maps() + return "Successfully deleted from Nomic map" + except Exception as e: + print(e) + self.sentry.capture_exception(e) + return "Error in deleting from document map: {e}" \ No newline at end of file From bafb8d58fe55f53445661fe4fb99a52e8d08ab52 Mon Sep 17 00:00:00 2001 From: star-nox Date: Thu, 28 Mar 2024 15:25:48 -0500 Subject: [PATCH 08/11] minor changes to convo functions --- ai_ta_backend/beam/nomic_logging.py | 3 +- ai_ta_backend/database/sql.py | 11 +- ai_ta_backend/main.py | 18 ++- ai_ta_backend/service/nomic_service.py | 191 ++++++++++++++++++------- 4 files changed, 166 insertions(+), 57 deletions(-) diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index 5d7fff03..41ae392d 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -40,7 +40,8 @@ def create_document_map(course_name: str): # check if map exists response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() if response.data: - return "Map already exists for this course." + if response.data[0]['doc_map_id']: + return "Map already exists for this course." # fetch relevant document data from Supabase response = SUPABASE_CLIENT.table("documents").select("id", diff --git a/ai_ta_backend/database/sql.py b/ai_ta_backend/database/sql.py index ae77e6fe..ecd775d2 100644 --- a/ai_ta_backend/database/sql.py +++ b/ai_ta_backend/database/sql.py @@ -90,16 +90,19 @@ def insertProjectInfo(self, project_info): return self.supabase_client.table("projects").insert(project_info).execute() def getAllFromLLMConvoMonitor(self, course_name: str): - return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).execute() + return self.supabase_client.table("llm-convo-monitor").select("*").eq("course_name", course_name).order('id', desc=False).execute() - def getCountFromLLMConvoMonitor(self, course_name: str): - return self.supabase_client.table("llm-convo-monitor").select("id", count='exact').eq("course_name", course_name).execute() + def getCountFromLLMConvoMonitor(self, course_name: str, last_id: int): + if last_id == 0: + return self.supabase_client.table("llm-convo-monitor").select("id", count='exact').eq("course_name", course_name).order('id', desc=False).execute() + else: + return self.supabase_client.table("llm-convo-monitor").select("id", count='exact').eq("course_name", course_name).gt("id", last_id).order('id', desc=False).execute() def getDocMapFromProjects(self, course_name: str): return self.supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() def getConvoMapFromProjects(self, course_name: str): - return self.supabase_client.table("projects").select("convo_map_id").eq("course_name", course_name).execute() + return self.supabase_client.table("projects").select("*").eq("course_name", course_name).execute() def updateProjects(self, course_name: str, data: dict): return self.supabase_client.table("projects").update(data).eq("course_name", course_name).execute() diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index cfe9cbf0..b27e97c3 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -38,6 +38,8 @@ from ai_ta_backend.service.retrieval_service import RetrievalService from ai_ta_backend.service.sentry_service import SentryService +from ai_ta_backend.beam.nomic_logging import create_document_map + app = Flask(__name__) CORS(app) executor = Executor(app) @@ -191,7 +193,7 @@ def createDocumentMap(service: NomicService): # proper web error "400 Bad request" abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") - map_id = service.create_document_map(course_name) + map_id = create_document_map(course_name) response = jsonify(map_id) response.headers.add('Access-Control-Allow-Origin', '*') @@ -211,6 +213,20 @@ def createConversationMap(service: NomicService): response.headers.add('Access-Control-Allow-Origin', '*') return response +@app.route('/logToConversationMap', methods=['GET']) +def logToConversationMap(service: NomicService): + course_name: str = request.args.get('course_name', default='', type=str) + + if course_name == '': + # proper web error "400 Bad request" + abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") + + map_id = service.log_to_conversation_map(course_name) + + response = jsonify(map_id) + response.headers.add('Access-Control-Allow-Origin', '*') + return response + @app.route('/onResponseCompletion', methods=['POST']) def logToNomic(service: NomicService, flaskExecutor: ExecutorInterface): diff --git a/ai_ta_backend/service/nomic_service.py b/ai_ta_backend/service/nomic_service.py index 2066468a..fee6ee21 100644 --- a/ai_ta_backend/service/nomic_service.py +++ b/ai_ta_backend/service/nomic_service.py @@ -258,6 +258,84 @@ def log_to_conversation_map(self, course_name: str): 2. If no, create it 3. If yes, fetch all conversations since last upload and log it """ + nomic.login(os.getenv('NOMIC_API_KEY')) + NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' + + # check if map exists + response = self.sql.getConvoMapFromProjects(course_name) + print("Response from supabase: ", response.data) + + if not response.data[0]['convo_map_id']: + print("Map does not exist for this course. Redirecting to map creation...") + return self.create_conversation_map(course_name) + + project_id = response.data[0]['convo_map_id'] + last_uploaded_convo_id = response.data[0]['last_uploaded_convo_id'] + + # check if project is accepting data + project = AtlasProject(project_id=project_id, add_datums_if_exists=True) + if not project.is_accepting_data: + return "Project is currently indexing and cannot ingest new datums. Try again later." + + # fetch count of conversations since last upload + response = self.sql.getCountFromLLMConvoMonitor(course_name, last_id=last_uploaded_convo_id) + total_convo_count = response.count + print("Total number of unlogged conversations in Supabase: ", total_convo_count) + + if total_convo_count == 0: + return "No new conversations to log." + + first_id = last_uploaded_convo_id + combined_dfs = [] + current_convo_count = 0 + convo_count = 0 + + while current_convo_count < total_convo_count: + response = self.sql.getAllConversationsBetweenIds(course_name, first_id, 0, 100) + print("Response count: ", len(response.data)) + if len(response.data) == 0: + break + df = pd.DataFrame(response.data) + combined_dfs.append(df) + current_convo_count += len(response.data) + convo_count += len(response.data) + print(current_convo_count) + + if convo_count >= 500: + # concat all dfs from the combined_dfs list + final_df = pd.concat(combined_dfs, ignore_index=True) + # prep data for nomic upload + embeddings, metadata = self.data_prep_for_convo_map(final_df) + # append to existing map + print("Appending data to existing map...") + result = self.append_to_map(embeddings, metadata, NOMIC_MAP_NAME_PREFIX + course_name) + if result == "success": + last_id = int(final_df['id'].iloc[-1]) + project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} + project_response = self.sql.updateProjects(course_name, project_info) + print("Update response from supabase: ", project_response) + # reset variables + combined_dfs = [] + convo_count = 0 + print("Records uploaded: ", current_convo_count) + + # set first_id for next iteration + first_id = response.data[-1]['id'] + 1 + + # upload last set of convos + if convo_count > 0: + print("Uploading last set of conversations...") + final_df = pd.concat(combined_dfs, ignore_index=True) + embeddings, metadata = self.data_prep_for_convo_map(final_df) + result = self.append_to_map(embeddings, metadata, NOMIC_MAP_NAME_PREFIX + course_name) + if result == "success": + last_id = int(final_df['id'].iloc[-1]) + project_info = {'course_name': course_name, 'convo_map_id': project_id, 'last_uploaded_convo_id': last_id} + project_response = self.sql.updateProjects(course_name, project_info) + print("Update response from supabase: ", project_response) + + return "success" + def create_conversation_map(self, course_name: str): @@ -270,11 +348,12 @@ def create_conversation_map(self, course_name: str): # check if map exists response = self.sql.getConvoMapFromProjects(course_name) print("Response from supabase: ", response.data) - if response.data[0]['convo_map_id']: - return "Map already exists for this course." + if response.data: + if response.data[0]['convo_map_id']: + return "Map already exists for this course." # if no, fetch total count of records - response = self.sql.getCountFromLLMConvoMonitor(course_name) + response = self.sql.getCountFromLLMConvoMonitor(course_name, last_id=0) # if <20, return message that map cannot be created if not response.count: @@ -340,7 +419,9 @@ def create_conversation_map(self, course_name: str): project = AtlasProject(name=project_name, add_datums_if_exists=True) result = self.append_to_map(embeddings, metadata, project_name) if result == "success": + print("map append successful") last_id = int(final_df['id'].iloc[-1]) + project_info = {'last_uploaded_convo_id': last_id} project_response = self.sql.updateProjects(course_name, project_info) print("Update response from supabase: ", project_response) @@ -470,7 +551,7 @@ def append_to_map(self, embeddings, metadata, map_name): project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True) with project.wait_for_project_lock(): project.add_embeddings(embeddings=embeddings, data=metadata) - return "Successfully appended to Nomic map" + return "success" except Exception as e: print(e) return "Error in appending to map: {e}" @@ -486,45 +567,51 @@ def data_prep_for_convo_map(self, df: pd.DataFrame): metadata: pd.DataFrame of metadata """ print("in data_prep_for_convo_map()") - try: - metadata = [] - embeddings = [] - user_queries = [] - - for _index, row in df.iterrows(): - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") - conversation_exists = False - conversation = "" - emoji = "" - - if row['user_email'] is None: - user_email = "" - else: - user_email = row['user_email'] - - messages = row['convo']['messages'] + + metadata = [] + embeddings = [] + user_queries = [] + + for _index, row in df.iterrows(): + current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") + conversation_exists = False + conversation = "" + emoji = "" + + if row['user_email'] is None: + user_email = "" + else: + user_email = row['user_email'] + + messages = row['convo']['messages'] + + # some conversations include images, so the data structure is different + if isinstance(messages[0]['content'], list): + if 'text' in messages[0]['content'][0]: + first_message = messages[0]['content'][0]['text'] + #print("First message:", first_message) + else: first_message = messages[0]['content'] - # some conversations include images, so the data structure is different - if isinstance(first_message, list): - first_message = first_message[0]['text'] - user_queries.append(first_message) - - # construct metadata for multi-turn conversation - for message in messages: - if message['role'] == 'user': - emoji = "🙋 " - else: - emoji = "🤖 " + user_queries.append(first_message) - if isinstance(message['content'], list): + # construct metadata for multi-turn conversation + for message in messages: + if message['role'] == 'user': + emoji = "🙋 " + else: + emoji = "🤖 " + + if isinstance(message['content'], list): + + if 'text' in message['content'][0]: text = message['content'][0]['text'] - else: - text = message['content'] + else: + text = message['content'] - conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" + conversation += "\n>>> " + emoji + message['role'] + ": " + text + "\n" - meta_row = { + meta_row = { "course": row['course_name'], "conversation": conversation, "conversation_id": row['convo']['id'], @@ -533,23 +620,25 @@ def data_prep_for_convo_map(self, df: pd.DataFrame): "first_query": first_message, "created_at": created_at, "modified_at": current_time - } - - metadata.append(meta_row) + } + #print("Metadata row:", meta_row) + metadata.append(meta_row) - embeddings_model = OpenAIEmbeddings(openai_api_type="openai", + embeddings_model = OpenAIEmbeddings(openai_api_type="openai", openai_api_base="https://api.openai.com/v1/", openai_api_key=os.environ['VLADS_OPENAI_KEY']) - embeddings = embeddings_model.embed_documents(user_queries) + embeddings = embeddings_model.embed_documents(user_queries) - metadata = pd.DataFrame(metadata) - embeddings = np.array(embeddings) - return embeddings, metadata - - except Exception as e: - print("Error in data_prep_for_convo_map():", e) - self.sentry.capture_exception(e) - return None, None + metadata = pd.DataFrame(metadata) + embeddings = np.array(embeddings) + print("Metadata shape:", metadata.shape) + print("Embeddings shape:", embeddings.shape) + return embeddings, metadata + + # except Exception as e: + # print("Error in data_prep_for_convo_map():", e) + # self.sentry.capture_exception(e) + # return None, None def delete_from_document_map(self, project_id: str, ids: list): """ From 9c0c031dc86a1dd750e9433dfc372e51105d97bf Mon Sep 17 00:00:00 2001 From: star-nox Date: Thu, 28 Mar 2024 16:57:37 -0500 Subject: [PATCH 09/11] corrected function calling in ingest --- ai_ta_backend/beam/ingest.py | 4 ++-- ai_ta_backend/beam/nomic_logging.py | 29 ++++++++++++++++++++++------- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/ai_ta_backend/beam/ingest.py b/ai_ta_backend/beam/ingest.py index a9cb0a4c..9bc8b487 100644 --- a/ai_ta_backend/beam/ingest.py +++ b/ai_ta_backend/beam/ingest.py @@ -1029,8 +1029,8 @@ def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]]): # add to Nomic document map if len(response.data) > 0: - inserted_data = response.data[0] - log_to_document_map(inserted_data) + course_name = contexts[0].metadata.get('course_name') + log_to_document_map(course_name) self.posthog.capture('distinct_id_of_the_user', event='split_and_upload_succeeded', diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index 41ae392d..30dcf301 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -102,8 +102,14 @@ def create_document_map(course_name: str): project_id = project.id last_id = int(final_df['id'].iloc[-1]) project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} - update_response = SUPABASE_CLIENT.table("projects").insert(project_info).execute() - print("Response from supabase: ", update_response) + project_response = SUPABASE_CLIENT.table("projects").select("*").eq("course_name", course_name).execute() + if project_response.data: + update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) + else: + insert_response = SUPABASE_CLIENT.table("projects").insert(project_info).execute() + print("Insert Response from supabase: ", insert_response) + else: # append to existing map @@ -143,12 +149,19 @@ def create_document_map(course_name: str): if result == "success": # update the last uploaded id in supabase last_id = int(final_df['id'].iloc[-1]) - project_info = {'last_uploaded_doc_id': last_id} + project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_id = project.id + project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} print("project_info: ", project_info) - update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() - print("Response from supabase: ", update_response) - - + project_response = SUPABASE_CLIENT.table("projects").select("*").eq("course_name", course_name).execute() + if project_response.data: + update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) + else: + insert_response = SUPABASE_CLIENT.table("projects").insert(project_info).execute() + print("Insert Response from supabase: ", insert_response) + + # rebuild the map rebuild_map(course_name, "document") @@ -352,6 +365,8 @@ def data_prep_for_doc_map(df: pd.DataFrame): created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") if row['url'] == None: row['url'] = "" + if row['base_url'] == None: + row['base_url'] = "" # iterate through all contexts and create separate entries for each context_count = 0 for context in row['contexts']: From 78e92e6f262073abfef7725de96f6dfbd44f28ea Mon Sep 17 00:00:00 2001 From: star-nox Date: Thu, 28 Mar 2024 18:14:17 -0500 Subject: [PATCH 10/11] removed executor bec of errors --- ai_ta_backend/main.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index b27e97c3..cb63cb99 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -214,7 +214,7 @@ def createConversationMap(service: NomicService): return response @app.route('/logToConversationMap', methods=['GET']) -def logToConversationMap(service: NomicService): +def logToConversationMap(service: NomicService, flaskExecutor: ExecutorInterface): course_name: str = request.args.get('course_name', default='', type=str) if course_name == '': @@ -222,6 +222,7 @@ def logToConversationMap(service: NomicService): abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") map_id = service.log_to_conversation_map(course_name) + #map_id = flaskExecutor.submit(service.log_to_conversation_map, course_name) response = jsonify(map_id) response.headers.add('Access-Control-Allow-Origin', '*') @@ -244,7 +245,8 @@ def logToNomic(service: NomicService, flaskExecutor: ExecutorInterface): print(f"In /onResponseCompletion for course: {course_name}") # background execution of tasks!! - response = flaskExecutor.submit(service.log_convo_to_nomic, course_name, data) + #response = flaskExecutor.submit(service.log_convo_to_nomic, course_name, data) + response = flaskExecutor.submit(service.log_to_conversation_map, course_name) response = jsonify({'outcome': 'success'}) response.headers.add('Access-Control-Allow-Origin', '*') return response From 4e553f4ae4378b7865aef5250a0b2ee1751fb980 Mon Sep 17 00:00:00 2001 From: star-nox Date: Thu, 28 Mar 2024 19:54:34 -0500 Subject: [PATCH 11/11] fixed serialization error in main --- ai_ta_backend/main.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index cb63cb99..452792ac 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -221,8 +221,8 @@ def logToConversationMap(service: NomicService, flaskExecutor: ExecutorInterface # proper web error "400 Bad request" abort(400, description=f"Missing required parameter: 'course_name' must be provided. Course name: `{course_name}`") - map_id = service.log_to_conversation_map(course_name) - #map_id = flaskExecutor.submit(service.log_to_conversation_map, course_name) + #map_id = service.log_to_conversation_map(course_name) + map_id = flaskExecutor.submit(service.log_to_conversation_map, course_name).result() response = jsonify(map_id) response.headers.add('Access-Control-Allow-Origin', '*') @@ -246,7 +246,7 @@ def logToNomic(service: NomicService, flaskExecutor: ExecutorInterface): # background execution of tasks!! #response = flaskExecutor.submit(service.log_convo_to_nomic, course_name, data) - response = flaskExecutor.submit(service.log_to_conversation_map, course_name) + result = flaskExecutor.submit(service.log_to_conversation_map, course_name).result() response = jsonify({'outcome': 'success'}) response.headers.add('Access-Control-Allow-Origin', '*') return response