Skip to content

Commit

Permalink
fixes #51: Add script to upload to blob storage json file containing …
Browse files Browse the repository at this point in the history
…pipeline
  • Loading branch information
Maxence Guindon committed Feb 26, 2024
1 parent 63dddec commit 1d3514e
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 46 deletions.
66 changes: 35 additions & 31 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from quart import Quart, request, jsonify
from quart_cors import cors
from collections import namedtuple

from cryptography.fernet import Fernet

from custom_exceptions import (
DeleteDirectoryRequestError,
Expand All @@ -26,6 +26,10 @@
connection_string_regex = r"^DefaultEndpointsProtocol=https?;.*;FileEndpoint=https://[a-zA-Z0-9]+\.file\.core\.windows\.net/;$"
connection_string = os.getenv("NACHET_AZURE_STORAGE_CONNECTION_STRING")

FERNET_KEY = os.getenv("NACHET_BLOB_PIPELINE_DECRYPTION_KEY")
PIPELINE_VERSION = os.getenv("NACHET_BLOB_PIPELINE_VERSION")
PIPELINE_BLOB_NAME = os.getenv("NACHET_BLOB_PIPELINE_NAME")

endpoint_url_regex = r"^https://.*\/score$"
endpoint_url = os.getenv("NACHET_MODEL_ENDPOINT_REST_URL")
sd_endpoint = os.getenv("NACHET_SEED_DETECTOR_ENDPOINT")
Expand Down Expand Up @@ -321,53 +325,53 @@ async def fetch_json(repo_URL, key, file_path, mock=False):
result = response.read()
result_json = json.loads(result.decode("utf-8"))
return result_json
# logic to build pipelines
if mock:
with open("mock_pipeline_json.json", "r+") as f:
result_json = json.load(f)
else:
# TO DO: call the blob storage to get the file
result_json = await azure_storage_api.get_pipeline_info(connection_string, "user-bab1da84-5937-4016-965e-67e1ea6e29c4", "0.1.0")

api_call_function = {func.split("from_")[1]: getattr(model_module, func) for func in dir(model_module) if "inference" in func.split("_")}
inference_functions = {func: getattr(inference, func) for func in dir(inference) if "process" in func.split("_")}
models = ()
for model in result_json.get("models"):
m = Model(
api_call_function.get(model.get("api_call_function")),
model.get("model_name"),
model.get("endpoint"),
model.get("api_key"),
inference_functions.get(model.get("inference_function")),
model.get("content-type"),
model.get("deployment_platform")
)
models += (m,)

for pipeline in result_json.get("pipelines"):
CACHE["pipelines"][pipeline.get("pipeline_name")] = tuple([m for m in models if m.name in pipeline.get("models")])

return result_json.get("pipelines")

except urllib.error.HTTPError as error:
raise ValueError(str(error))
except Exception as e:
raise ValueError(str(e))

async def get_pipeline(mock:bool = False):
if mock:
with open("mock_pipeline_json.json", "r+") as f:
result_json = json.load(f)
else:
result_json = await azure_storage_api.get_pipeline_info(connection_string, PIPELINE_BLOB_NAME, PIPELINE_VERSION)
cipher_suite = Fernet(FERNET_KEY)

api_call_function = {func.split("from_")[1]: getattr(model_module, func) for func in dir(model_module) if "inference" in func.split("_")}
inference_functions = {func: getattr(inference, func) for func in dir(inference) if "process" in func.split("_")}

models = ()
for model in result_json.get("models"):
m = Model(
api_call_function.get(model.get("api_call_function")),
model.get("model_name"),
cipher_suite.decrypt(model.get("endpoint").encode()).decode(),
cipher_suite.decrypt(model.get("api_key").encode()).decode(),
inference_functions.get(model.get("inference_function")),
model.get("content-type"),
model.get("deployment_platform")
)
models += (m,)

for pipeline in result_json.get("pipelines"):
CACHE["pipelines"][pipeline.get("pipeline_name")] = tuple([m for m in models if m.name in pipeline.get("models")])

return result_json.get("pipelines")

async def data_factory(**kwargs):
return {
"input_data": kwargs,
}


@app.before_serving
async def before_serving():
try:
# Get all the inference functions from the model_module and map them in a dictionary
CACHE["seeds"] = await fetch_json(NACHET_DATA, "seeds", "seeds/all.json")
CACHE["endpoints"] = await fetch_json(NACHET_MODEL, "endpoints", "model_endpoints_metadata.json") #, mock=True)
print(CACHE["endpoints"])
# CACHE["endpoints"] = await fetch_json(NACHET_MODEL, "endpoints", "model_endpoints_metadata.json")
CACHE["endpoints"] = await get_pipeline() # mock=True
except Exception as e:
print(e)
raise ServerError("Failed to retrieve data from the repository")
Expand Down
29 changes: 27 additions & 2 deletions azure_storage/azure_storage_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ async def get_pipeline_info(
json_blob = await get_blob(container_client, blob.name)
if json_blob:
pipeline = json.loads(json_blob)
if pipeline.get("version") == pipeline_version:
return pipeline
if not isinstance(pipeline, list):
if pipeline.get("version") == pipeline_version:
return pipeline
else:
raise PipelineNotFoundError(
"This version of the pipeline was not found."
Expand All @@ -269,3 +270,27 @@ async def get_pipeline_info(
except FolderListError as error:
print(error)
return False

def insert_new_version_pipeline(
pipelines_json: dict,
connection_string: str,
pipleine_container_name: str
) -> bool:
try:
blob_service_client = BlobServiceClient.from_connection_string(
connection_string
)

if blob_service_client:
container_client = blob_service_client.get_container_client(
pipleine_container_name
)

json_name = "{}/{}.json".format("pipelines", pipelines_json.get("version"))
container_client.upload_blob(json_name, json.dumps(pipelines_json, indent=4), overwrite=True)
return True
else:
raise ConnectionStringError("Invalid connection string")
except ConnectionStringError as error:
print(error)
return False
33 changes: 22 additions & 11 deletions docs/nachet-model-documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ Nachet Interactive models' perform the following tasks:
## List of models

|Model|Full name|Task|Active|Accuracy|
|--|--|:--:|:--:|:--:|
|Nachet-6seeds | m-14of15seeds-6seedsmag | Object Detection | Yes | - |
|Seed-detector | seed-detector-1 | Object Detection | Yes | - |
|Swin | swinv1-base-dataaugv2-1 | Classification | Yes | - |
|Model|Full name|Task|API Call Function|Inference Function|Active|Accuracy|
|--|--|:--:|:--:|:--:|:--:|:--:|
|Nachet-6seeds | m-14of15seeds-6seedsmag | Object Detection | nachet_6seeds | None | Yes | - |
|Seed-detector | seed-detector-1 | Object Detection | seed_detector | process_image_slicing | Yes | - |
|Swin | swinv1-base-dataaugv2-1 | Classification | swin | process_swin_result | Yes | - |

### Inference and API Call Function

The inference and API call functions act as entry and exit points for the model. The API call explicitly requests a prediction from the specified model (such as Swin, Nachet-6seeds, etc.). The inference function processes the data before sending it to the frontend if the model requires it. For instance, the Seed-detector only returns "seed" as a label, and its inference needs to be processed and passed to the next model which assigns the correct label to the seeds.

## Return value of models

Expand All @@ -43,25 +47,25 @@ Nachet Interactive models' perform the following tasks:
},
"label": "top_label_name",
"score": 0.912,
"topResult": [
"topN": [
{
"score": 0.912
"score": 0.912,
"label": "top_label_name",
},
{
"score": 0.053
"score": 0.053,
"label": "seed_name",
},
{
"score": 0.0029
"score": 0.0029,
"label": "seed_name",
},
{
"score": 0.005
"score": 0.005,
"label": "seed_name",
},
{
"score": 0.001
"score": 0.001,
"label": "seed_name",
}
],
Expand All @@ -75,6 +79,13 @@ Nachet Interactive models' perform the following tasks:
"totalBoxes": 1
}
```
### Why topN
We decided to named the top results property top N because this value can return n predictions. Usually in AI, the top 5 result are use to measure the accuracy of a model. If the correct result is the top 5, then it is considered that the prediction was true.

This is useful in case were the user have is attention on more then 1 result.

> "Top N accuracy — Top N accuracy is when you measure how often your predicted class falls in the top N values of your softmax distribution."
[Nagda, R. (2019-11-08) *Evaluating models using the Top N accuracy metrics*. Medium](https://medium.com/nanonets/evaluating-models-using-the-top-n-accuracy-metrics-c0355b36f91b)

## Different ways of calling models

Expand Down
2 changes: 1 addition & 1 deletion model_inference/model_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async def request_inference_from_nachet_6seeds(model: namedtuple, previous_resul
result = response.read()
result_json = json.loads(result.decode("utf8"))

return result_json,
return result_json[0],

except Exception as e:
raise InferenceRequestError(f"An error occurred while processing the request:\n {str(e)}")
Expand Down
30 changes: 30 additions & 0 deletions pipelines_version_insertion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
import json

import azure_storage.azure_storage_api as azure_storage_api

from cryptography.fernet import Fernet
from dotenv import load_dotenv

load_dotenv()

key = os.getenv("NACHET_BLOB_PIPELINE_DECRYPTION_KEY")
blob_storage_account_name = os.getenv("NACHET_BLOB_PIPELINE_NAME")
connection_string = os.getenv("NACHET_AZURE_STORAGE_CONNECTION_STRING")

cipher_suite = Fernet(key)

with (open("./mock_pipeline_json.json", "r")) as file:
pipelines_json = file.read()

pipelines_json = json.loads(pipelines_json)

for model in pipelines_json["models"]:
# crypting endopoint
endpoint = model["endpoint"].encode()
model["endpoint"] = cipher_suite.encrypt(endpoint).decode()
# crypting api_key
api_key = model["api_key"].encode()
model["api_key"] = cipher_suite.encrypt(api_key).decode()

print(azure_storage_api.insert_new_version_pipeline(pipelines_json, connection_string, blob_storage_account_name))
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ quart
quart-cors
python-dotenv
hypercorn
Pillow
Pillow
cryptography

0 comments on commit 1d3514e

Please sign in to comment.