diff --git a/app/data_fetching/factory.py b/app/data_fetching/factory.py index 88ad98c..61553ab 100644 --- a/app/data_fetching/factory.py +++ b/app/data_fetching/factory.py @@ -49,20 +49,60 @@ async def background_data_fetch( return texts, metadata, ids = prepare_metadata_ids_content(docs) + try: with SQLAlchemyTransactionContext().manage() as tx_context: - docs, _ = ovstore.__list_docs__( + existing_docs, _ = ovstore.__list_docs__( workspace_id=dir.workspace_id, directory=dir.name, - limit=100000, - projection=["custom_id"], + limit=-1, + projection=["custom_id", "document"], tx_context=tx_context, ) - ids_to_delete = [d.custom_id for d in docs] - delete_ids(ovstore=ovstore, ids=ids_to_delete) + + existing_doc_dict = {doc.custom_id: doc.document for doc in existing_docs} + + docs_to_delete = [] + metadata_to_insert = [] + ids_to_insert = [] + texts_to_insert = [] + + for i, doc_id in enumerate(ids): + if doc_id not in existing_doc_dict: + # New document + texts_to_insert.append(texts[i]) + metadata_to_insert.append(metadata[i]) + ids_to_insert.append(doc_id) + elif existing_doc_dict[doc_id] != texts[i]: + # Changed document + docs_to_delete.append(doc_id) + texts_to_insert.append(texts[i]) + metadata_to_insert.append(metadata[i]) + ids_to_insert.append(doc_id) + + # Delete documents that no longer exist + docs_to_delete_not_exist = set(existing_doc_dict.keys()) - set(ids) + # Append documents that no longer exist to the deletion list + docs_to_delete.extend(docs_to_delete_not_exist) + + # Delete changed and non-existent documents + if docs_to_delete: + delete_ids(ovstore=ovstore, ids=docs_to_delete) + log.debug( + f"Deleted {len(docs_to_delete)} changed documents from directory {dir.name}" + ) + except NotImplementedError: - log.error(f"could not delete directory {dir.name} data before importing") + log.error(f"could not process directory {dir.name} data") return - inserted_ids = ovstore.add_texts(texts=texts, metadatas=metadata, ids=ids) - log.debug(f"inserted {len(inserted_ids)} into directory {dir.name} vector store") + # Insert new or changed documents + if len(texts_to_insert) > 0: + inserted_ids = ovstore.add_texts( + texts=texts_to_insert, metadatas=metadata_to_insert, ids=ids_to_insert + ) + log.debug( + f"Inserted {len(inserted_ids)} new or changed documents into directory {dir.name} vector store" + ) + else: + log.debug(f"No new or changed documents to insert for directory {dir.name}") diff --git a/app/ext_vector_store.py b/app/ext_vector_store.py index 541085a..081da51 100644 --- a/app/ext_vector_store.py +++ b/app/ext_vector_store.py @@ -123,7 +123,7 @@ def list_documents( proj = create_projection(proj=projection) query = text( - f"select {proj} from {embedding_table_name} where {filters} order by uuid limit :limit offset :offset;" + f"select {proj} from {embedding_table_name} where {filters} order by uuid {'limit :limit' if limit > 0 else ''} offset :offset;" ) result = tx_context.connection.execute( diff --git a/docs/dev/api_exploration.ipynb b/docs/dev/api_exploration.ipynb index 7a64704..ea3fee3 100644 --- a/docs/dev/api_exploration.ipynb +++ b/docs/dev/api_exploration.ipynb @@ -706,6 +706,9 @@ "outputs": [], "source": [ "# import okta roles\n", + "import requests\n", + "from starlette import status\n", + "\n", "dir_id = \"ABCDE11ere\"\n", "resp = requests.post(f\"{API_URL}/directories/{dir_id}/.import\", headers=headers, json={\"create_apps\": True})\n", "if resp.status_code != status.HTTP_202_ACCEPTED:\n", @@ -849,7 +852,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.4" + "version": "3.12.5" } }, "nbformat": 4,