From 4f3166cfbefb80b09b1c0937fc69f7df291a9e40 Mon Sep 17 00:00:00 2001 From: Maxence Guindon Date: Mon, 26 Feb 2024 22:15:49 +0000 Subject: [PATCH] #fixes #51: update documentation on pipeline --- .env.template | 14 ++-- app.py | 21 ++++-- docs/nachet-inference-documentation.md | 89 ++++++++++++++++++++++---- model_inference/model_module.py | 2 +- 4 files changed, 102 insertions(+), 24 deletions(-) diff --git a/.env.template b/.env.template index f7ad3f0f..c19a870b 100644 --- a/.env.template +++ b/.env.template @@ -1,12 +1,18 @@ NACHET_AZURE_STORAGE_CONNECTION_STRING= + NACHET_MODEL_ENDPOINT_REST_URL= NACHET_MODEL_ENDPOINT_ACCESS_KEY= +NACHET_SWIN_ENDPOINT= +NACHET_SWIN_ACCESS_KEY= +NACHET_SEED_DETECTOR_ENDPOINT= +NACHET_SEED_DETECTOR_ACCESS_KEY= + NACHET_DATA= NACHET_SUBSCRIPTION_ID= NACHET_RESOURCE_GROUP= NACHET_WORKSPACE= NACHET_MODEL= -NACHET_SWIN_ENDPOINT= -NACHET_SWIN_ACCESS_KEY= -NACHET_SEED_DETECTOR_ENDPOINT= -NACHET_SEED_DETECTOR_ACCESS_KEY= + +NACHET_BLOB_PIPELINE_NAME= +NACHET_BLOB_PIPELINE_VERSION= +NACHET_BLOB_PIPELINE_DECRYPTION_KEY= diff --git a/app.py b/app.py index 2d41b218..7388fec4 100644 --- a/app.py +++ b/app.py @@ -331,15 +331,25 @@ async def fetch_json(repo_URL, key, file_path, mock=False): except Exception as e: raise ValueError(str(e)) -async def get_pipeline(mock:bool = False): +async def get_pipelines(mock:bool = False): + """ + Retrieves the pipelines from the Azure storage API. + + Parameters: + - mock (bool): If True, retrieves the pipelines from a mock JSON file. If False, retrieves the pipelines from the Azure storage API. + + Returns: + - list: A list of dictionaries representing the pipelines. + """ if mock: with open("mock_pipeline_json.json", "r+") as f: result_json = json.load(f) else: result_json = await azure_storage_api.get_pipeline_info(connection_string, PIPELINE_BLOB_NAME, PIPELINE_VERSION) cipher_suite = Fernet(FERNET_KEY) - + # Get all the api_call function and map them in a dictionary api_call_function = {func.split("from_")[1]: getattr(model_module, func) for func in dir(model_module) if "inference" in func.split("_")} + # Get all the inference functions and map them in a dictionary inference_functions = {func: getattr(inference, func) for func in dir(inference) if "process" in func.split("_")} models = () @@ -354,11 +364,11 @@ async def get_pipeline(mock:bool = False): model.get("deployment_platform") ) models += (m,) - + # Build the pipeline to call the models in order in the inference request for pipeline in result_json.get("pipelines"): CACHE["pipelines"][pipeline.get("pipeline_name")] = tuple([m for m in models if m.name in pipeline.get("models")]) - return result_json.get("pipelines") + return result_json.get("pipelines") async def data_factory(**kwargs): return { @@ -368,10 +378,9 @@ async def data_factory(**kwargs): @app.before_serving async def before_serving(): try: - # Get all the inference functions from the model_module and map them in a dictionary CACHE["seeds"] = await fetch_json(NACHET_DATA, "seeds", "seeds/all.json") # CACHE["endpoints"] = await fetch_json(NACHET_MODEL, "endpoints", "model_endpoints_metadata.json") - CACHE["endpoints"] = await get_pipeline() # mock=True + CACHE["endpoints"] = await get_pipelines() # mock=True except Exception as e: print(e) raise ServerError("Failed to retrieve data from the repository") diff --git a/docs/nachet-inference-documentation.md b/docs/nachet-inference-documentation.md index 74b82e01..23a27feb 100644 --- a/docs/nachet-inference-documentation.md +++ b/docs/nachet-inference-documentation.md @@ -151,33 +151,96 @@ Box | 1 | Contains all the information of one seed in the image totalBoxes | 1 | Boxes total number label | 2 | Contains the top label for the seed score | 2 | Contains the top score for the seed -topResult | 2 | Contains the top 5 scores for the seed +topN | 2 | Contains the top N scores for the seed overlapping | 2 | Contains a boolean to tell if the box overlap with another one overlappingIndices | 2 | Contains the index of the overlapping box -**topResult** contains the top 5 predictions of the models: +*for more look at [nachet-model-documentation]("https://github.com/ai-cfia/nachet-backend/blob/51-implementing-2-models/docs/nachet-model-documentation.md")* + +**topN** contains the top 5 predictions of the models: ```json -"topResult": [ +"topN": [ { - 'label': seed_name, - 'score': 0,75 + "label": "seed_name", + "score": 0.75 } { - 'label': seed_name, - 'score': 0,18 + "label": "seed_name", + "score": 0.18 } { - 'label': seed_name, - 'score': 0,05 + "label": "seed_name", + "score": 0.05 } { - 'label': seed_name, - 'score': 0,019 + "label": "seed_name", + "score": 0.019 } { - 'label': seed_name, - 'score': 0,001 + "label": "seed_name", + "score": 0.001 } ] ``` +### Blob storage and Pipeline versioning +To keep track of the various pipelines iterations and versions, JSON files are stored in the blob storage. Users can add the JSON to the blob storage using the `pipelines_version_insertion.py` script. This allows for easy management of model and pipeline history. + +To use the script, 3 new environment variables are instore: +* NACHET_BLOB_PIPELINE_NAME + * Containing the blob name where the pipelines arestored +* NACHET_BLOB_PIPELINE_VERSION + * Containing the version the user wants to select +* NACHET_BLOB_PIPELINE_DECRYPTION_KEY + * The key to decrypt sensible data such as the API key and the endpoint of a model. + +#### In the code +In the backend, the pipelines are retrieved using the `get_pipelines` function. This function retrieved the data from the blob storage and stored the pipeline in the `CACHE["endpoint"]` variable. This the variable that feed the frontend the `models` information and metadata. + +```python +async def get_pipeline(mock:bool = False): + """ + Retrieves the pipelines from the Azure storage API. + + Parameters: + - mock (bool): If True, retrieves the pipelines from a mock JSON file. If False, retrieves the pipelines from the Azure storage API. + + Returns: + - list: A list of dictionaries representing the pipelines. + """ + if mock: + with open("mock_pipeline_json.json", "r+") as f: + result_json = json.load(f) + else: + result_json = await azure_storage_api.get_pipeline_info(connection_string, PIPELINE_BLOB_NAME, PIPELINE_VERSION) + cipher_suite = Fernet(FERNET_KEY) + # Get all the api_call function and map them in a dictionary + api_call_function = {func.split("from_")[1]: getattr(model_module, func) for func in dir(model_module) if "inference" in func.split("_")} + # Get all the inference functions and map them in a dictionary + inference_functions = {func: getattr(inference, func) for func in dir(inference) if "process" in func.split("_")} + + models = () + for model in result_json.get("models"): + m = Model( + api_call_function.get(model.get("api_call_function")), + model.get("model_name"), + cipher_suite.decrypt(model.get("endpoint").encode()).decode(), + cipher_suite.decrypt(model.get("api_key").encode()).decode(), + inference_functions.get(model.get("inference_function")), + model.get("content-type"), + model.get("deployment_platform") + ) + models += (m,) + + # Build the pipeline to call the models in order in the inference request + for pipeline in result_json.get("pipelines"): + CACHE["pipelines"][pipeline.get("pipeline_name")] = tuple([m for m in models if m.name in pipeline.get("models")]) + + return result_json.get("pipelines") + +``` + +### Available Version of the JSON file: +|Version|Creation Date| Pipelines +--|--|-- +0.1.0 | 2024-02-26 | Swin Transformer and 6 Seeds Detector diff --git a/model_inference/model_module.py b/model_inference/model_module.py index 1f57cf6e..ad52a4be 100644 --- a/model_inference/model_module.py +++ b/model_inference/model_module.py @@ -89,7 +89,7 @@ async def request_inference_from_nachet_6seeds(model: namedtuple, previous_resul result = response.read() result_json = json.loads(result.decode("utf8")) - return result_json[0], + return result_json except Exception as e: raise InferenceRequestError(f"An error occurred while processing the request:\n {str(e)}")