From 1d3514e87589fd4e8f06fdedb956182d9c416965 Mon Sep 17 00:00:00 2001 From: Maxence Guindon Date: Mon, 26 Feb 2024 21:42:36 +0000 Subject: [PATCH] fixes #51: Add script to upload to blob storage json file containing pipeline --- app.py | 66 ++++++++++++++++-------------- azure_storage/azure_storage_api.py | 29 ++++++++++++- docs/nachet-model-documentation.md | 33 ++++++++++----- model_inference/model_module.py | 2 +- pipelines_version_insertion.py | 30 ++++++++++++++ requirements.txt | 3 +- 6 files changed, 117 insertions(+), 46 deletions(-) create mode 100644 pipelines_version_insertion.py diff --git a/app.py b/app.py index 2f36a4aa..2d41b218 100644 --- a/app.py +++ b/app.py @@ -12,7 +12,7 @@ from quart import Quart, request, jsonify from quart_cors import cors from collections import namedtuple - +from cryptography.fernet import Fernet from custom_exceptions import ( DeleteDirectoryRequestError, @@ -26,6 +26,10 @@ connection_string_regex = r"^DefaultEndpointsProtocol=https?;.*;FileEndpoint=https://[a-zA-Z0-9]+\.file\.core\.windows\.net/;$" connection_string = os.getenv("NACHET_AZURE_STORAGE_CONNECTION_STRING") +FERNET_KEY = os.getenv("NACHET_BLOB_PIPELINE_DECRYPTION_KEY") +PIPELINE_VERSION = os.getenv("NACHET_BLOB_PIPELINE_VERSION") +PIPELINE_BLOB_NAME = os.getenv("NACHET_BLOB_PIPELINE_NAME") + endpoint_url_regex = r"^https://.*\/score$" endpoint_url = os.getenv("NACHET_MODEL_ENDPOINT_REST_URL") sd_endpoint = os.getenv("NACHET_SEED_DETECTOR_ENDPOINT") @@ -321,53 +325,53 @@ async def fetch_json(repo_URL, key, file_path, mock=False): result = response.read() result_json = json.loads(result.decode("utf-8")) return result_json - # logic to build pipelines - if mock: - with open("mock_pipeline_json.json", "r+") as f: - result_json = json.load(f) - else: - # TO DO: call the blob storage to get the file - result_json = await azure_storage_api.get_pipeline_info(connection_string, "user-bab1da84-5937-4016-965e-67e1ea6e29c4", "0.1.0") - - api_call_function = {func.split("from_")[1]: getattr(model_module, func) for func in dir(model_module) if "inference" in func.split("_")} - inference_functions = {func: getattr(inference, func) for func in dir(inference) if "process" in func.split("_")} - models = () - for model in result_json.get("models"): - m = Model( - api_call_function.get(model.get("api_call_function")), - model.get("model_name"), - model.get("endpoint"), - model.get("api_key"), - inference_functions.get(model.get("inference_function")), - model.get("content-type"), - model.get("deployment_platform") - ) - models += (m,) - - for pipeline in result_json.get("pipelines"): - CACHE["pipelines"][pipeline.get("pipeline_name")] = tuple([m for m in models if m.name in pipeline.get("models")]) - - return result_json.get("pipelines") except urllib.error.HTTPError as error: raise ValueError(str(error)) except Exception as e: raise ValueError(str(e)) +async def get_pipeline(mock:bool = False): + if mock: + with open("mock_pipeline_json.json", "r+") as f: + result_json = json.load(f) + else: + result_json = await azure_storage_api.get_pipeline_info(connection_string, PIPELINE_BLOB_NAME, PIPELINE_VERSION) + cipher_suite = Fernet(FERNET_KEY) + + api_call_function = {func.split("from_")[1]: getattr(model_module, func) for func in dir(model_module) if "inference" in func.split("_")} + inference_functions = {func: getattr(inference, func) for func in dir(inference) if "process" in func.split("_")} + + models = () + for model in result_json.get("models"): + m = Model( + api_call_function.get(model.get("api_call_function")), + model.get("model_name"), + cipher_suite.decrypt(model.get("endpoint").encode()).decode(), + cipher_suite.decrypt(model.get("api_key").encode()).decode(), + inference_functions.get(model.get("inference_function")), + model.get("content-type"), + model.get("deployment_platform") + ) + models += (m,) + + for pipeline in result_json.get("pipelines"): + CACHE["pipelines"][pipeline.get("pipeline_name")] = tuple([m for m in models if m.name in pipeline.get("models")]) + + return result_json.get("pipelines") async def data_factory(**kwargs): return { "input_data": kwargs, } - @app.before_serving async def before_serving(): try: # Get all the inference functions from the model_module and map them in a dictionary CACHE["seeds"] = await fetch_json(NACHET_DATA, "seeds", "seeds/all.json") - CACHE["endpoints"] = await fetch_json(NACHET_MODEL, "endpoints", "model_endpoints_metadata.json") #, mock=True) - print(CACHE["endpoints"]) + # CACHE["endpoints"] = await fetch_json(NACHET_MODEL, "endpoints", "model_endpoints_metadata.json") + CACHE["endpoints"] = await get_pipeline() # mock=True except Exception as e: print(e) raise ServerError("Failed to retrieve data from the repository") diff --git a/azure_storage/azure_storage_api.py b/azure_storage/azure_storage_api.py index 6c93a9a4..af0822c7 100644 --- a/azure_storage/azure_storage_api.py +++ b/azure_storage/azure_storage_api.py @@ -259,8 +259,9 @@ async def get_pipeline_info( json_blob = await get_blob(container_client, blob.name) if json_blob: pipeline = json.loads(json_blob) - if pipeline.get("version") == pipeline_version: - return pipeline + if not isinstance(pipeline, list): + if pipeline.get("version") == pipeline_version: + return pipeline else: raise PipelineNotFoundError( "This version of the pipeline was not found." @@ -269,3 +270,27 @@ async def get_pipeline_info( except FolderListError as error: print(error) return False + +def insert_new_version_pipeline( + pipelines_json: dict, + connection_string: str, + pipleine_container_name: str + ) -> bool: + try: + blob_service_client = BlobServiceClient.from_connection_string( + connection_string + ) + + if blob_service_client: + container_client = blob_service_client.get_container_client( + pipleine_container_name + ) + + json_name = "{}/{}.json".format("pipelines", pipelines_json.get("version")) + container_client.upload_blob(json_name, json.dumps(pipelines_json, indent=4), overwrite=True) + return True + else: + raise ConnectionStringError("Invalid connection string") + except ConnectionStringError as error: + print(error) + return False diff --git a/docs/nachet-model-documentation.md b/docs/nachet-model-documentation.md index e872c0cc..fcbfb922 100644 --- a/docs/nachet-model-documentation.md +++ b/docs/nachet-model-documentation.md @@ -23,11 +23,15 @@ Nachet Interactive models' perform the following tasks: ## List of models -|Model|Full name|Task|Active|Accuracy| -|--|--|:--:|:--:|:--:| -|Nachet-6seeds | m-14of15seeds-6seedsmag | Object Detection | Yes | - | -|Seed-detector | seed-detector-1 | Object Detection | Yes | - | -|Swin | swinv1-base-dataaugv2-1 | Classification | Yes | - | +|Model|Full name|Task|API Call Function|Inference Function|Active|Accuracy| +|--|--|:--:|:--:|:--:|:--:|:--:| +|Nachet-6seeds | m-14of15seeds-6seedsmag | Object Detection | nachet_6seeds | None | Yes | - | +|Seed-detector | seed-detector-1 | Object Detection | seed_detector | process_image_slicing | Yes | - | +|Swin | swinv1-base-dataaugv2-1 | Classification | swin | process_swin_result | Yes | - | + +### Inference and API Call Function + +The inference and API call functions act as entry and exit points for the model. The API call explicitly requests a prediction from the specified model (such as Swin, Nachet-6seeds, etc.). The inference function processes the data before sending it to the frontend if the model requires it. For instance, the Seed-detector only returns "seed" as a label, and its inference needs to be processed and passed to the next model which assigns the correct label to the seeds. ## Return value of models @@ -43,25 +47,25 @@ Nachet Interactive models' perform the following tasks: }, "label": "top_label_name", "score": 0.912, - "topResult": [ + "topN": [ { - "score": 0.912 + "score": 0.912, "label": "top_label_name", }, { - "score": 0.053 + "score": 0.053, "label": "seed_name", }, { - "score": 0.0029 + "score": 0.0029, "label": "seed_name", }, { - "score": 0.005 + "score": 0.005, "label": "seed_name", }, { - "score": 0.001 + "score": 0.001, "label": "seed_name", } ], @@ -75,6 +79,13 @@ Nachet Interactive models' perform the following tasks: "totalBoxes": 1 } ``` +### Why topN +We decided to named the top results property top N because this value can return n predictions. Usually in AI, the top 5 result are use to measure the accuracy of a model. If the correct result is the top 5, then it is considered that the prediction was true. + +This is useful in case were the user have is attention on more then 1 result. + + > "Top N accuracy — Top N accuracy is when you measure how often your predicted class falls in the top N values of your softmax distribution." + [Nagda, R. (2019-11-08) *Evaluating models using the Top N accuracy metrics*. Medium](https://medium.com/nanonets/evaluating-models-using-the-top-n-accuracy-metrics-c0355b36f91b) ## Different ways of calling models diff --git a/model_inference/model_module.py b/model_inference/model_module.py index e1387e83..1f57cf6e 100644 --- a/model_inference/model_module.py +++ b/model_inference/model_module.py @@ -89,7 +89,7 @@ async def request_inference_from_nachet_6seeds(model: namedtuple, previous_resul result = response.read() result_json = json.loads(result.decode("utf8")) - return result_json, + return result_json[0], except Exception as e: raise InferenceRequestError(f"An error occurred while processing the request:\n {str(e)}") diff --git a/pipelines_version_insertion.py b/pipelines_version_insertion.py new file mode 100644 index 00000000..86487295 --- /dev/null +++ b/pipelines_version_insertion.py @@ -0,0 +1,30 @@ +import os +import json + +import azure_storage.azure_storage_api as azure_storage_api + +from cryptography.fernet import Fernet +from dotenv import load_dotenv + +load_dotenv() + +key = os.getenv("NACHET_BLOB_PIPELINE_DECRYPTION_KEY") +blob_storage_account_name = os.getenv("NACHET_BLOB_PIPELINE_NAME") +connection_string = os.getenv("NACHET_AZURE_STORAGE_CONNECTION_STRING") + +cipher_suite = Fernet(key) + +with (open("./mock_pipeline_json.json", "r")) as file: + pipelines_json = file.read() + +pipelines_json = json.loads(pipelines_json) + +for model in pipelines_json["models"]: + # crypting endopoint + endpoint = model["endpoint"].encode() + model["endpoint"] = cipher_suite.encrypt(endpoint).decode() + # crypting api_key + api_key = model["api_key"].encode() + model["api_key"] = cipher_suite.encrypt(api_key).decode() + +print(azure_storage_api.insert_new_version_pipeline(pipelines_json, connection_string, blob_storage_account_name)) diff --git a/requirements.txt b/requirements.txt index 3648fe5a..7444593d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ quart quart-cors python-dotenv hypercorn -Pillow \ No newline at end of file +Pillow +cryptography