Skip to content

Commit

Permalink
feat: skip equal docs
Browse files Browse the repository at this point in the history
when listing directory data, skips equal document, deletes non existent docs and insert new/changed docs.
  • Loading branch information
Erez Sharim authored and asaf committed Dec 5, 2024
1 parent b1e2ee4 commit 895f186
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 10 deletions.
56 changes: 48 additions & 8 deletions app/data_fetching/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,60 @@ async def background_data_fetch(
return

texts, metadata, ids = prepare_metadata_ids_content(docs)

try:
with SQLAlchemyTransactionContext().manage() as tx_context:
docs, _ = ovstore.__list_docs__(
existing_docs, _ = ovstore.__list_docs__(
workspace_id=dir.workspace_id,
directory=dir.name,
limit=100000,
projection=["custom_id"],
limit=-1,
projection=["custom_id", "document"],
tx_context=tx_context,
)
ids_to_delete = [d.custom_id for d in docs]
delete_ids(ovstore=ovstore, ids=ids_to_delete)

existing_doc_dict = {doc.custom_id: doc.document for doc in existing_docs}

docs_to_delete = []
metadata_to_insert = []
ids_to_insert = []
texts_to_insert = []

for i, doc_id in enumerate(ids):
if doc_id not in existing_doc_dict:
# New document
texts_to_insert.append(texts[i])
metadata_to_insert.append(metadata[i])
ids_to_insert.append(doc_id)
elif existing_doc_dict[doc_id] != texts[i]:
# Changed document
docs_to_delete.append(doc_id)
texts_to_insert.append(texts[i])
metadata_to_insert.append(metadata[i])
ids_to_insert.append(doc_id)

# Delete documents that no longer exist
docs_to_delete_not_exist = set(existing_doc_dict.keys()) - set(ids)
# Append documents that no longer exist to the deletion list
docs_to_delete.extend(docs_to_delete_not_exist)

# Delete changed and non-existent documents
if docs_to_delete:
delete_ids(ovstore=ovstore, ids=docs_to_delete)
log.debug(
f"Deleted {len(docs_to_delete)} changed documents from directory {dir.name}"
)

except NotImplementedError:
log.error(f"could not delete directory {dir.name} data before importing")
log.error(f"could not process directory {dir.name} data")
return

inserted_ids = ovstore.add_texts(texts=texts, metadatas=metadata, ids=ids)
log.debug(f"inserted {len(inserted_ids)} into directory {dir.name} vector store")
# Insert new or changed documents
if len(texts_to_insert) > 0:
inserted_ids = ovstore.add_texts(
texts=texts_to_insert, metadatas=metadata_to_insert, ids=ids_to_insert
)
log.debug(
f"Inserted {len(inserted_ids)} new or changed documents into directory {dir.name} vector store"
)
else:
log.debug(f"No new or changed documents to insert for directory {dir.name}")
2 changes: 1 addition & 1 deletion app/ext_vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def list_documents(

proj = create_projection(proj=projection)
query = text(
f"select {proj} from {embedding_table_name} where {filters} order by uuid limit :limit offset :offset;"
f"select {proj} from {embedding_table_name} where {filters} order by uuid {'limit :limit' if limit > 0 else ''} offset :offset;"
)

result = tx_context.connection.execute(
Expand Down
5 changes: 4 additions & 1 deletion docs/dev/api_exploration.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,9 @@
"outputs": [],
"source": [
"# import okta roles\n",
"import requests\n",
"from starlette import status\n",
"\n",
"dir_id = \"ABCDE11ere\"\n",
"resp = requests.post(f\"{API_URL}/directories/{dir_id}/.import\", headers=headers, json={\"create_apps\": True})\n",
"if resp.status_code != status.HTTP_202_ACCEPTED:\n",
Expand Down Expand Up @@ -849,7 +852,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.4"
"version": "3.12.5"
}
},
"nbformat": 4,
Expand Down

0 comments on commit 895f186

Please sign in to comment.