Skip to content

Commit

Permalink
First commit with changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
nhwkuhns committed Dec 28, 2023
1 parent 5710995 commit e57a0cf
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 54 deletions.
8 changes: 8 additions & 0 deletions docs/features/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ To learn more, please visit the [Cognitive Search](/docs/features/cognitive_sear

The end user leverages the web interface as the primary method to engage with the IA Accelerator, and the Azure OpenAI service. The user interface is very similar to that of the OpenAI ChatGPT interface, though it provides different and additional functionality which is outlined on the [User Experience](/docs/features/user_experience.md) page.

## Document Deletion

In order to delete a document from the system entirely, one must only delete the document from the upload container in the `infoasststore*****` Storage Account. The Azure Function `FileDeletion` runs on a 10 minute timer and will delete the relevant documents from the content Storage container, the AI Search Index, and the Cosmos DB tag container. It will then update the state of the document, which can be viewed in the Upload Status portion of the UI under the Manage Content tab at the top right.

## Works in Progress (Future releases)

### Image Similarity Search
Expand All @@ -104,3 +108,7 @@ We've starting with text-based image retrieval, but in the future, we have plans
### Adding Evaluation Guidance and Metrics

To ensure transparency and accountability, we are researching comprehensive evaluation guidance and metrics. This will assist users in assessing the performance and trustworthiness of AI-generated responses, fostering confidence in the platform.

### File Deletion in the UI

The ability to delete documents from the system will be enabled through a future UI update.
157 changes: 157 additions & 0 deletions functions/FileDeletion/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import logging
import os
from datetime import datetime, timezone
from itertools import islice
import azure.functions as func
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.storage.blob import BlobServiceClient
from shared_code.status_log import State, StatusClassification, StatusLog
from shared_code.tags_helper import TagsHelper

blob_connection_string = os.environ["BLOB_CONNECTION_STRING"]
blob_storage_account_upload_container_name = os.environ[
"BLOB_STORAGE_ACCOUNT_UPLOAD_CONTAINER_NAME"]
blob_storage_account_output_container_name = os.environ[
"BLOB_STORAGE_ACCOUNT_OUTPUT_CONTAINER_NAME"]
azure_search_service_endpoint = os.environ["AZURE_SEARCH_SERVICE_ENDPOINT"]
azure_search_index = os.environ["AZURE_SEARCH_INDEX"]
azure_search_service_key = os.environ["AZURE_SEARCH_SERVICE_KEY"]
cosmosdb_url = os.environ["COSMOSDB_URL"]
cosmosdb_key = os.environ["COSMOSDB_KEY"]
cosmosdb_tags_database_name = os.environ["COSMOSDB_TAGS_DATABASE_NAME"]
cosmosdb_tags_container_name = os.environ["COSMOSDB_TAGS_CONTAINER_NAME"]
cosmosdb_log_database_name = os.environ["COSMOSDB_LOG_DATABASE_NAME"]
cosmosdb_log_container_name = os.environ["COSMOSDB_LOG_CONTAINER_NAME"]

status_log = StatusLog(cosmosdb_url,
cosmosdb_key,
cosmosdb_log_database_name,
cosmosdb_log_container_name)

tags_helper = TagsHelper(cosmosdb_url,
cosmosdb_key,
cosmosdb_tags_database_name,
cosmosdb_tags_container_name)

def chunks(data, size):
'''max number of blobs to delete in one request is 256, so this breaks
chunks the dictionary'''
# create an iterator over the keys
it = iter(data)
# loop over the range of the length of the data
for i in range(0, len(data), size):
# yield a dictionary with a slice of keys and their values
yield {k: data [k] for k in islice(it, size)}

def get_deleted_blobs(blob_service_client: BlobServiceClient) -> list:
'''Creates and returns a list of file paths that are soft-deleted.'''
# Create Uploaded Container Client and list all blobs, including deleted blobs
upload_container_client = blob_service_client.get_container_client(
blob_storage_account_upload_container_name)
temp_list = upload_container_client.list_blobs(include="deleted")

deleted_blobs = []
# Pull out the soft-deleted blob names
for blob in temp_list:
if blob.deleted:
logging.debug("\t Deleted Blob name: %s", blob.name)
deleted_blobs.append(blob.name)
return deleted_blobs

def delete_content_blobs(blob_service_client: BlobServiceClient, deleted_blob: str) -> dict:
'''Deletes blobs in the content container that correspond to a given
soft-deleted blob from the upload container. Returns a list of deleted
content blobs for use in other methods.'''
# Create Content Container Client
content_container_client = blob_service_client.get_container_client(
blob_storage_account_output_container_name)
# Get a dict with all chunked blobs that came from the deleted blob in the upload container
chunked_blobs_to_delete = {}
content_list = content_container_client.list_blobs(name_starts_with=deleted_blob)
for blob in content_list:
chunked_blobs_to_delete[blob.name] = None
logging.debug("Total number of chunked blobs to delete - %s", str(len(chunked_blobs_to_delete)))
# Split the chunked blob dict into chunks of less than 256
chunked_content_blob_dict = list(chunks(chunked_blobs_to_delete, 255))
# Delete all of the content blobs that came from a deleted blob in the upload container
for item in chunked_content_blob_dict:
content_container_client.delete_blobs(*item)
return chunked_blobs_to_delete

def delete_search_entries(deleted_content_blobs: dict) -> None:
'''Takes a list of content blobs that were deleted in a previous
step and deletes the corresponding entries in the Azure AI
Search index.'''
search_client = SearchClient(azure_search_service_endpoint,
azure_search_index,
AzureKeyCredential(azure_search_service_key))

search_id_list_to_delete = []
for file_path in deleted_content_blobs.keys():
search_id_list_to_delete.append({"id": status_log.encode_document_id(file_path)})

logging.debug("Total Search IDs to delete: %s", str(len(search_id_list_to_delete)))

if len(search_id_list_to_delete) > 0:
search_client.delete_documents(documents=search_id_list_to_delete)
logging.debug("Succesfully deleted items from AI Search index.")
else:
logging.debug("No items to delete from AI Search index.")

def main(mytimer: func.TimerRequest) -> None:
'''This function is a cron job that runs every 10 miuntes, detects when
a file has been deleted in the upload container and
1. removes the generated Blob chunks from the content container,
2. removes the CosmosDB tags entry, and
3. updates the CosmosDB logging entry to the Delete state
If a file has already gone through this process, updates to the code in
shared_code/status_log.py prevent the status from being continually updated'''
utc_timestamp = datetime.utcnow().replace(
tzinfo=timezone.utc).isoformat()

if mytimer.past_due:
logging.info('The timer is past due!')

logging.info('Python timer trigger function ran at %s', utc_timestamp)

# Create Blob Service Client
blob_service_client = BlobServiceClient.from_connection_string(blob_connection_string)
deleted_blobs = get_deleted_blobs(blob_service_client)

blob_name = ""
try:
for blob in deleted_blobs:
blob_name = blob
deleted_content_blobs = delete_content_blobs(blob_service_client, blob)
logging.info("%s content blobs deleted.", str(len(deleted_content_blobs)))
delete_search_entries(deleted_content_blobs)
tags_helper.delete_doc(blob)

# for doc in deleted_blobs:
doc_base = os.path.basename(blob)
doc_path = f"upload/{format(blob)}"

temp_doc_id = status_log.encode_document_id(doc_path)

logging.info("Modifying status for doc %s \n \t with ID %s", doc_base, temp_doc_id)

status_log.upsert_document(doc_path,
'Document chunks, tags, and entries in AI Search have been deleted',
StatusClassification.INFO,
State.DELETED)
status_log.save_document(doc_path)
except Exception as err:
logging.info("An exception occured with doc %s: %s", blob_name, str(err))
doc_base = os.path.basename(blob)
doc_path = f"upload/{format(blob)}"
temp_doc_id = status_log.encode_document_id(doc_path)
logging.info("Modifying status for doc %s \n \t with ID %s", doc_base, temp_doc_id)
status_log.upsert_document(doc_path,
f'Error deleting document from system: {str(err)}',
StatusClassification.ERROR,
State.ERROR)
status_log.save_document(doc_path)
11 changes: 11 additions & 0 deletions functions/FileDeletion/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "mytimer",
"type": "timerTrigger",
"direction": "in",
"schedule": "0 */10 * * * *"
}
]
}
110 changes: 60 additions & 50 deletions functions/shared_code/status_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class State(Enum):
ERROR = "Error"
THROTTLED = "Throttled"
UPLOADED = "Uploaded"
DELETED = "Deleted"
ALL = "All"

class StatusClassification(Enum):
Expand All @@ -33,7 +34,6 @@ class StatusQueryLevel(Enum):
CONCISE = "Concise"
VERBOSE = "Verbose"


class StatusLog:
""" Class for logging status of various processes to Cosmos DB"""

Expand Down Expand Up @@ -91,7 +91,6 @@ def read_file_status(self,

return items


def read_files_status_by_timeframe(self,
within_n_hours: int,
state: State = State.ALL
Expand Down Expand Up @@ -134,7 +133,7 @@ def upsert_document(self, document_path, status, status_classification: StatusCl
document_id = self.encode_document_id(document_path)

# add status to standard logger
logging.info(f"{status} DocumentID - {document_id}")
logging.info("%s DocumentID - %s", status, document_id)

# If this event is the start of an upload, remove any existing status files for this path
if fresh_start:
Expand All @@ -152,46 +151,58 @@ def upsert_document(self, document_path, status, status_classification: StatusCl
else:
json_document = self._log_document[document_id]

# Check if there has been a state change, and therefore to update state
if json_document['state'] != state.value:
json_document['state'] = state.value
json_document['state_timestamp'] = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

# Update state description with latest status
json_document['state_description'] = status

# Append a new item to the array
status_updates = json_document["status_updates"]
new_item = {
"status": status,
"status_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')),
"status_classification": str(status_classification.value)
}

if status_classification == StatusClassification.ERROR:
new_item["stack_trace"] = self.get_stack_trace()
json_state = json_document['state']
if json_state != State.DELETED.value and json_state != State.ERROR.value:
# Check if there has been a state change, and therefore to update state
if json_document['state'] != state.value:
json_document['state'] = state.value
json_document['state_timestamp'] = str(datetime
.now()
.strftime('%Y-%m-%d %H:%M:%S'))

status_updates.append(new_item)
# Update state description with latest status
json_document['state_description'] = status
# Append a new item to the array
status_updates = json_document["status_updates"]
new_item = {
"status": status,
"status_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')),
"status_classification": str(status_classification.value)
}

if status_classification == StatusClassification.ERROR:
new_item["stack_trace"] = self.get_stack_trace()
status_updates.append(new_item)
else:
logging.debug("%s is already marked as %s. No new status to update.",
document_path,
json_state)
except exceptions.CosmosResourceNotFoundError:
# this is a new document
json_document = {
"id": document_id,
"file_path": document_path,
"file_name": base_name,
"state": str(state.value),
"start_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')),
"state_description": status,
"state_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')),
"status_updates": [
{
"status": status,
"status_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')),
"status_classification": str(status_classification.value)
}
]
}
except Exception:
if state != State.DELETED:
# this is a valid new document
json_document = {
"id": document_id,
"file_path": document_path,
"file_name": base_name,
"state": str(state.value),
"start_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')),
"state_description": status,
"state_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')),
"status_updates": [
{
"status": status,
"status_timestamp": str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')),
"status_classification": str(status_classification.value)
}
]
}
elif state == State.DELETED:
# the status file was previously deleted. Do nothing.
logging.debug("No record found for deleted document %s. Nothing to do.",
document_path)
except Exception as err:
# log the exception with stack trace to the status log
logging.error("Unexpected exception upserting document %s", str(err))
json_document = {
"id": document_id,
"file_path": document_path,
Expand All @@ -212,33 +223,32 @@ def upsert_document(self, document_path, status, status_classification: StatusCl

#self.container.upsert_item(body=json_document)
self._log_document[document_id] = json_document



def update_document_state(self, document_path, status, state=State.PROCESSING):
"""Updates the state of the document in the storage"""
try:
document_id = self.encode_document_id(document_path)
logging.info(f"{status} DocumentID - {document_id}")
logging.info("%sDocumentID - %s", status, document_id)
if self._log_document.get(document_id, "") != "":
json_document = self._log_document[document_id]

json_document['state'] = state.value
json_document['state_description'] = status
json_document['state_timestamp'] = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
self.save_document(document_path)
self._log_document[document_id] = json_document
else:
logging.warning(f"Document with ID {document_id} not found.")
logging.warning("Document with ID %s not found.", document_id)
except Exception as err:
logging.error(f"An error occurred while updating the document state: {str(err)}")

logging.error("An error occurred while updating the document state: %s", str(err))

def save_document(self, document_path):
"""Saves the document in the storage"""
document_id = self.encode_document_id(document_path)
self.container.upsert_item(body=self._log_document[document_id])
if self._log_document[document_id] != "":
self.container.upsert_item(body=self._log_document[document_id])
else:
logging.debug("no update to be made for %s, skipping.", document_path)
self._log_document[document_id] = ""


def get_stack_trace(self):
""" Returns the stack trace of the current exception"""
Expand All @@ -251,4 +261,4 @@ def get_stack_trace(self):
stackstr = trc + ''.join(traceback.format_list(stack))
if exc is not None:
stackstr += ' ' + traceback.format_exc().lstrip(trc)
return stackstr
return stackstr
Loading

0 comments on commit e57a0cf

Please sign in to comment.