From e57a0cf44505525f16b1db38b5863670683f66b2 Mon Sep 17 00:00:00 2001 From: Nehemiah Kuhns Date: Thu, 28 Dec 2023 14:32:34 +0000 Subject: [PATCH] First commit with changes. --- docs/features/features.md | 8 ++ functions/FileDeletion/__init__.py | 157 +++++++++++++++++++++++++++ functions/FileDeletion/function.json | 11 ++ functions/shared_code/status_log.py | 110 ++++++++++--------- functions/shared_code/tags_helper.py | 20 +++- 5 files changed, 252 insertions(+), 54 deletions(-) create mode 100644 functions/FileDeletion/__init__.py create mode 100644 functions/FileDeletion/function.json diff --git a/docs/features/features.md b/docs/features/features.md index 405851c9c..4dcf9a593 100644 --- a/docs/features/features.md +++ b/docs/features/features.md @@ -95,6 +95,10 @@ To learn more, please visit the [Cognitive Search](/docs/features/cognitive_sear The end user leverages the web interface as the primary method to engage with the IA Accelerator, and the Azure OpenAI service. The user interface is very similar to that of the OpenAI ChatGPT interface, though it provides different and additional functionality which is outlined on the [User Experience](/docs/features/user_experience.md) page. +## Document Deletion + +In order to delete a document from the system entirely, one must only delete the document from the upload container in the `infoasststore*****` Storage Account. The Azure Function `FileDeletion` runs on a 10 minute timer and will delete the relevant documents from the content Storage container, the AI Search Index, and the Cosmos DB tag container. It will then update the state of the document, which can be viewed in the Upload Status portion of the UI under the Manage Content tab at the top right. + ## Works in Progress (Future releases) ### Image Similarity Search @@ -104,3 +108,7 @@ We've starting with text-based image retrieval, but in the future, we have plans ### Adding Evaluation Guidance and Metrics To ensure transparency and accountability, we are researching comprehensive evaluation guidance and metrics. This will assist users in assessing the performance and trustworthiness of AI-generated responses, fostering confidence in the platform. + +### File Deletion in the UI + +The ability to delete documents from the system will be enabled through a future UI update. diff --git a/functions/FileDeletion/__init__.py b/functions/FileDeletion/__init__.py new file mode 100644 index 000000000..a43c82439 --- /dev/null +++ b/functions/FileDeletion/__init__.py @@ -0,0 +1,157 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import logging +import os +from datetime import datetime, timezone +from itertools import islice +import azure.functions as func +from azure.core.credentials import AzureKeyCredential +from azure.search.documents import SearchClient +from azure.storage.blob import BlobServiceClient +from shared_code.status_log import State, StatusClassification, StatusLog +from shared_code.tags_helper import TagsHelper + +blob_connection_string = os.environ["BLOB_CONNECTION_STRING"] +blob_storage_account_upload_container_name = os.environ[ + "BLOB_STORAGE_ACCOUNT_UPLOAD_CONTAINER_NAME"] +blob_storage_account_output_container_name = os.environ[ + "BLOB_STORAGE_ACCOUNT_OUTPUT_CONTAINER_NAME"] +azure_search_service_endpoint = os.environ["AZURE_SEARCH_SERVICE_ENDPOINT"] +azure_search_index = os.environ["AZURE_SEARCH_INDEX"] +azure_search_service_key = os.environ["AZURE_SEARCH_SERVICE_KEY"] +cosmosdb_url = os.environ["COSMOSDB_URL"] +cosmosdb_key = os.environ["COSMOSDB_KEY"] +cosmosdb_tags_database_name = os.environ["COSMOSDB_TAGS_DATABASE_NAME"] +cosmosdb_tags_container_name = os.environ["COSMOSDB_TAGS_CONTAINER_NAME"] +cosmosdb_log_database_name = os.environ["COSMOSDB_LOG_DATABASE_NAME"] +cosmosdb_log_container_name = os.environ["COSMOSDB_LOG_CONTAINER_NAME"] + +status_log = StatusLog(cosmosdb_url, + cosmosdb_key, + cosmosdb_log_database_name, + cosmosdb_log_container_name) + +tags_helper = TagsHelper(cosmosdb_url, + cosmosdb_key, + cosmosdb_tags_database_name, + cosmosdb_tags_container_name) + +def chunks(data, size): + '''max number of blobs to delete in one request is 256, so this breaks + chunks the dictionary''' + # create an iterator over the keys + it = iter(data) + # loop over the range of the length of the data + for i in range(0, len(data), size): + # yield a dictionary with a slice of keys and their values + yield {k: data [k] for k in islice(it, size)} + +def get_deleted_blobs(blob_service_client: BlobServiceClient) -> list: + '''Creates and returns a list of file paths that are soft-deleted.''' + # Create Uploaded Container Client and list all blobs, including deleted blobs + upload_container_client = blob_service_client.get_container_client( + blob_storage_account_upload_container_name) + temp_list = upload_container_client.list_blobs(include="deleted") + + deleted_blobs = [] + # Pull out the soft-deleted blob names + for blob in temp_list: + if blob.deleted: + logging.debug("\t Deleted Blob name: %s", blob.name) + deleted_blobs.append(blob.name) + return deleted_blobs + +def delete_content_blobs(blob_service_client: BlobServiceClient, deleted_blob: str) -> dict: + '''Deletes blobs in the content container that correspond to a given + soft-deleted blob from the upload container. Returns a list of deleted + content blobs for use in other methods.''' + # Create Content Container Client + content_container_client = blob_service_client.get_container_client( + blob_storage_account_output_container_name) + # Get a dict with all chunked blobs that came from the deleted blob in the upload container + chunked_blobs_to_delete = {} + content_list = content_container_client.list_blobs(name_starts_with=deleted_blob) + for blob in content_list: + chunked_blobs_to_delete[blob.name] = None + logging.debug("Total number of chunked blobs to delete - %s", str(len(chunked_blobs_to_delete))) + # Split the chunked blob dict into chunks of less than 256 + chunked_content_blob_dict = list(chunks(chunked_blobs_to_delete, 255)) + # Delete all of the content blobs that came from a deleted blob in the upload container + for item in chunked_content_blob_dict: + content_container_client.delete_blobs(*item) + return chunked_blobs_to_delete + +def delete_search_entries(deleted_content_blobs: dict) -> None: + '''Takes a list of content blobs that were deleted in a previous + step and deletes the corresponding entries in the Azure AI + Search index.''' + search_client = SearchClient(azure_search_service_endpoint, + azure_search_index, + AzureKeyCredential(azure_search_service_key)) + + search_id_list_to_delete = [] + for file_path in deleted_content_blobs.keys(): + search_id_list_to_delete.append({"id": status_log.encode_document_id(file_path)}) + + logging.debug("Total Search IDs to delete: %s", str(len(search_id_list_to_delete))) + + if len(search_id_list_to_delete) > 0: + search_client.delete_documents(documents=search_id_list_to_delete) + logging.debug("Succesfully deleted items from AI Search index.") + else: + logging.debug("No items to delete from AI Search index.") + +def main(mytimer: func.TimerRequest) -> None: + '''This function is a cron job that runs every 10 miuntes, detects when + a file has been deleted in the upload container and + 1. removes the generated Blob chunks from the content container, + 2. removes the CosmosDB tags entry, and + 3. updates the CosmosDB logging entry to the Delete state + If a file has already gone through this process, updates to the code in + shared_code/status_log.py prevent the status from being continually updated''' + utc_timestamp = datetime.utcnow().replace( + tzinfo=timezone.utc).isoformat() + + if mytimer.past_due: + logging.info('The timer is past due!') + + logging.info('Python timer trigger function ran at %s', utc_timestamp) + + # Create Blob Service Client + blob_service_client = BlobServiceClient.from_connection_string(blob_connection_string) + deleted_blobs = get_deleted_blobs(blob_service_client) + + blob_name = "" + try: + for blob in deleted_blobs: + blob_name = blob + deleted_content_blobs = delete_content_blobs(blob_service_client, blob) + logging.info("%s content blobs deleted.", str(len(deleted_content_blobs))) + delete_search_entries(deleted_content_blobs) + tags_helper.delete_doc(blob) + + # for doc in deleted_blobs: + doc_base = os.path.basename(blob) + doc_path = f"upload/{format(blob)}" + + temp_doc_id = status_log.encode_document_id(doc_path) + + logging.info("Modifying status for doc %s \n \t with ID %s", doc_base, temp_doc_id) + + status_log.upsert_document(doc_path, + 'Document chunks, tags, and entries in AI Search have been deleted', + StatusClassification.INFO, + State.DELETED) + status_log.save_document(doc_path) + except Exception as err: + logging.info("An exception occured with doc %s: %s", blob_name, str(err)) + doc_base = os.path.basename(blob) + doc_path = f"upload/{format(blob)}" + temp_doc_id = status_log.encode_document_id(doc_path) + logging.info("Modifying status for doc %s \n \t with ID %s", doc_base, temp_doc_id) + status_log.upsert_document(doc_path, + f'Error deleting document from system: {str(err)}', + StatusClassification.ERROR, + State.ERROR) + status_log.save_document(doc_path) diff --git a/functions/FileDeletion/function.json b/functions/FileDeletion/function.json new file mode 100644 index 000000000..687c2b799 --- /dev/null +++ b/functions/FileDeletion/function.json @@ -0,0 +1,11 @@ +{ + "scriptFile": "__init__.py", + "bindings": [ + { + "name": "mytimer", + "type": "timerTrigger", + "direction": "in", + "schedule": "0 */10 * * * *" + } + ] + } \ No newline at end of file diff --git a/functions/shared_code/status_log.py b/functions/shared_code/status_log.py index 2e639a098..b8b19467d 100644 --- a/functions/shared_code/status_log.py +++ b/functions/shared_code/status_log.py @@ -20,6 +20,7 @@ class State(Enum): ERROR = "Error" THROTTLED = "Throttled" UPLOADED = "Uploaded" + DELETED = "Deleted" ALL = "All" class StatusClassification(Enum): @@ -33,7 +34,6 @@ class StatusQueryLevel(Enum): CONCISE = "Concise" VERBOSE = "Verbose" - class StatusLog: """ Class for logging status of various processes to Cosmos DB""" @@ -91,7 +91,6 @@ def read_file_status(self, return items - def read_files_status_by_timeframe(self, within_n_hours: int, state: State = State.ALL @@ -134,7 +133,7 @@ def upsert_document(self, document_path, status, status_classification: StatusCl document_id = self.encode_document_id(document_path) # add status to standard logger - logging.info(f"{status} DocumentID - {document_id}") + logging.info("%s DocumentID - %s", status, document_id) # If this event is the start of an upload, remove any existing status files for this path if fresh_start: @@ -152,46 +151,58 @@ def upsert_document(self, document_path, status, status_classification: StatusCl else: json_document = self._log_document[document_id] - # Check if there has been a state change, and therefore to update state - if json_document['state'] != state.value: - json_document['state'] = state.value - json_document['state_timestamp'] = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) - - # Update state description with latest status - json_document['state_description'] = status - - # Append a new item to the array - status_updates = json_document["status_updates"] - new_item = { - "status": status, - "status_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')), - "status_classification": str(status_classification.value) - } - - if status_classification == StatusClassification.ERROR: - new_item["stack_trace"] = self.get_stack_trace() + json_state = json_document['state'] + if json_state != State.DELETED.value and json_state != State.ERROR.value: + # Check if there has been a state change, and therefore to update state + if json_document['state'] != state.value: + json_document['state'] = state.value + json_document['state_timestamp'] = str(datetime + .now() + .strftime('%Y-%m-%d %H:%M:%S')) - status_updates.append(new_item) + # Update state description with latest status + json_document['state_description'] = status + # Append a new item to the array + status_updates = json_document["status_updates"] + new_item = { + "status": status, + "status_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')), + "status_classification": str(status_classification.value) + } + + if status_classification == StatusClassification.ERROR: + new_item["stack_trace"] = self.get_stack_trace() + status_updates.append(new_item) + else: + logging.debug("%s is already marked as %s. No new status to update.", + document_path, + json_state) except exceptions.CosmosResourceNotFoundError: - # this is a new document - json_document = { - "id": document_id, - "file_path": document_path, - "file_name": base_name, - "state": str(state.value), - "start_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')), - "state_description": status, - "state_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')), - "status_updates": [ - { - "status": status, - "status_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')), - "status_classification": str(status_classification.value) - } - ] - } - except Exception: + if state != State.DELETED: + # this is a valid new document + json_document = { + "id": document_id, + "file_path": document_path, + "file_name": base_name, + "state": str(state.value), + "start_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')), + "state_description": status, + "state_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')), + "status_updates": [ + { + "status": status, + "status_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')), + "status_classification": str(status_classification.value) + } + ] + } + elif state == State.DELETED: + # the status file was previously deleted. Do nothing. + logging.debug("No record found for deleted document %s. Nothing to do.", + document_path) + except Exception as err: # log the exception with stack trace to the status log + logging.error("Unexpected exception upserting document %s", str(err)) json_document = { "id": document_id, "file_path": document_path, @@ -212,33 +223,32 @@ def upsert_document(self, document_path, status, status_classification: StatusCl #self.container.upsert_item(body=json_document) self._log_document[document_id] = json_document - - + def update_document_state(self, document_path, status, state=State.PROCESSING): """Updates the state of the document in the storage""" try: document_id = self.encode_document_id(document_path) - logging.info(f"{status} DocumentID - {document_id}") + logging.info("%sDocumentID - %s", status, document_id) if self._log_document.get(document_id, "") != "": json_document = self._log_document[document_id] - json_document['state'] = state.value json_document['state_description'] = status json_document['state_timestamp'] = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) self.save_document(document_path) self._log_document[document_id] = json_document else: - logging.warning(f"Document with ID {document_id} not found.") + logging.warning("Document with ID %s not found.", document_id) except Exception as err: - logging.error(f"An error occurred while updating the document state: {str(err)}") - + logging.error("An error occurred while updating the document state: %s", str(err)) def save_document(self, document_path): """Saves the document in the storage""" document_id = self.encode_document_id(document_path) - self.container.upsert_item(body=self._log_document[document_id]) + if self._log_document[document_id] != "": + self.container.upsert_item(body=self._log_document[document_id]) + else: + logging.debug("no update to be made for %s, skipping.", document_path) self._log_document[document_id] = "" - def get_stack_trace(self): """ Returns the stack trace of the current exception""" @@ -251,4 +261,4 @@ def get_stack_trace(self): stackstr = trc + ''.join(traceback.format_list(stack)) if exc is not None: stackstr += ' ' + traceback.format_exc().lstrip(trc) - return stackstr \ No newline at end of file + return stackstr diff --git a/functions/shared_code/tags_helper.py b/functions/shared_code/tags_helper.py index 737952ac4..615dbbaf9 100644 --- a/functions/shared_code/tags_helper.py +++ b/functions/shared_code/tags_helper.py @@ -1,9 +1,10 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. -from azure.cosmos import CosmosClient, PartitionKey +from azure.cosmos import CosmosClient, PartitionKey, exceptions import traceback, sys import base64 +import logging class TagsHelper: """ Helper class for tag functions""" @@ -33,7 +34,7 @@ def get_all_tags(self): query = "SELECT DISTINCT VALUE t FROM c JOIN t IN c.tags" tag_array = self.container.query_items(query=query, enable_cross_partition_query=True) return ",".join(tag_array) - + def upsert_document(self, document_path, tags_list): """ Upserts a document into the database """ document_id = self.encode_document_id(document_path) @@ -48,7 +49,7 @@ def encode_document_id(self, document_id): """ encode a path/file name to remove unsafe chars for a cosmos db id """ safe_id = base64.urlsafe_b64encode(document_id.encode()).decode() return safe_id - + def get_stack_trace(self): """ Returns the stack trace of the current exception""" exc = sys.exc_info()[0] @@ -60,4 +61,15 @@ def get_stack_trace(self): stackstr = trc + ''.join(traceback.format_list(stack)) if exc is not None: stackstr += ' ' + traceback.format_exc().lstrip(trc) - return stackstr \ No newline at end of file + return stackstr + + def delete_doc(self, doc: str) -> None: + '''Deletes tag docs for a file paths''' + doc_id = self.encode_document_id(f"upload/{doc}") + file_path = f"upload/{doc}" + logging.debug("deleting tags item for doc %s \n \t with ID %s", doc, doc_id) + try: + self.container.delete_item(item=doc_id, partition_key=file_path) + logging.info("deleted tags for document path %s", file_path) + except exceptions.CosmosResourceNotFoundError: + logging.info("Tag entry for %s already deleted", file_path)