Skip to content

Commit

Permalink
#fixes #51: update documentation on pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxence Guindon committed Feb 26, 2024
1 parent 1d3514e commit 4f3166c
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 24 deletions.
14 changes: 10 additions & 4 deletions .env.template
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
NACHET_AZURE_STORAGE_CONNECTION_STRING=

NACHET_MODEL_ENDPOINT_REST_URL=
NACHET_MODEL_ENDPOINT_ACCESS_KEY=
NACHET_SWIN_ENDPOINT=
NACHET_SWIN_ACCESS_KEY=
NACHET_SEED_DETECTOR_ENDPOINT=
NACHET_SEED_DETECTOR_ACCESS_KEY=

NACHET_DATA=
NACHET_SUBSCRIPTION_ID=
NACHET_RESOURCE_GROUP=
NACHET_WORKSPACE=
NACHET_MODEL=
NACHET_SWIN_ENDPOINT=
NACHET_SWIN_ACCESS_KEY=
NACHET_SEED_DETECTOR_ENDPOINT=
NACHET_SEED_DETECTOR_ACCESS_KEY=

NACHET_BLOB_PIPELINE_NAME=
NACHET_BLOB_PIPELINE_VERSION=
NACHET_BLOB_PIPELINE_DECRYPTION_KEY=
21 changes: 15 additions & 6 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,15 +331,25 @@ async def fetch_json(repo_URL, key, file_path, mock=False):
except Exception as e:
raise ValueError(str(e))

async def get_pipeline(mock:bool = False):
async def get_pipelines(mock:bool = False):
"""
Retrieves the pipelines from the Azure storage API.
Parameters:
- mock (bool): If True, retrieves the pipelines from a mock JSON file. If False, retrieves the pipelines from the Azure storage API.
Returns:
- list: A list of dictionaries representing the pipelines.
"""
if mock:
with open("mock_pipeline_json.json", "r+") as f:
result_json = json.load(f)
else:
result_json = await azure_storage_api.get_pipeline_info(connection_string, PIPELINE_BLOB_NAME, PIPELINE_VERSION)
cipher_suite = Fernet(FERNET_KEY)
# Get all the api_call function and map them in a dictionary
api_call_function = {func.split("from_")[1]: getattr(model_module, func) for func in dir(model_module) if "inference" in func.split("_")}
# Get all the inference functions and map them in a dictionary
inference_functions = {func: getattr(inference, func) for func in dir(inference) if "process" in func.split("_")}

models = ()
Expand All @@ -354,11 +364,11 @@ async def get_pipeline(mock:bool = False):
model.get("deployment_platform")
)
models += (m,)

# Build the pipeline to call the models in order in the inference request
for pipeline in result_json.get("pipelines"):
CACHE["pipelines"][pipeline.get("pipeline_name")] = tuple([m for m in models if m.name in pipeline.get("models")])

return result_json.get("pipelines")
return result_json.get("pipelines")

async def data_factory(**kwargs):
return {
Expand All @@ -368,10 +378,9 @@ async def data_factory(**kwargs):
@app.before_serving
async def before_serving():
try:
# Get all the inference functions from the model_module and map them in a dictionary
CACHE["seeds"] = await fetch_json(NACHET_DATA, "seeds", "seeds/all.json")
# CACHE["endpoints"] = await fetch_json(NACHET_MODEL, "endpoints", "model_endpoints_metadata.json")
CACHE["endpoints"] = await get_pipeline() # mock=True
CACHE["endpoints"] = await get_pipelines() # mock=True
except Exception as e:
print(e)
raise ServerError("Failed to retrieve data from the repository")
Expand Down
89 changes: 76 additions & 13 deletions docs/nachet-inference-documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,33 +151,96 @@ Box | 1 | Contains all the information of one seed in the image
totalBoxes | 1 | Boxes total number
label | 2 | Contains the top label for the seed
score | 2 | Contains the top score for the seed
topResult | 2 | Contains the top 5 scores for the seed
topN | 2 | Contains the top N scores for the seed
overlapping | 2 | Contains a boolean to tell if the box overlap with another one
overlappingIndices | 2 | Contains the index of the overlapping box

**topResult** contains the top 5 predictions of the models:
*for more look at [nachet-model-documentation]("https://github.com/ai-cfia/nachet-backend/blob/51-implementing-2-models/docs/nachet-model-documentation.md")*

**topN** contains the top 5 predictions of the models:
```json
"topResult": [
"topN": [
{
'label': seed_name,
'score': 0,75
"label": "seed_name",
"score": 0.75
}
{
'label': seed_name,
'score': 0,18
"label": "seed_name",
"score": 0.18
}
{
'label': seed_name,
'score': 0,05
"label": "seed_name",
"score": 0.05
}
{
'label': seed_name,
'score': 0,019
"label": "seed_name",
"score": 0.019
}
{
'label': seed_name,
'score': 0,001
"label": "seed_name",
"score": 0.001
}
]
```

### Blob storage and Pipeline versioning
To keep track of the various pipelines iterations and versions, JSON files are stored in the blob storage. Users can add the JSON to the blob storage using the `pipelines_version_insertion.py` script. This allows for easy management of model and pipeline history.

To use the script, 3 new environment variables are instore:
* NACHET_BLOB_PIPELINE_NAME
* Containing the blob name where the pipelines arestored
* NACHET_BLOB_PIPELINE_VERSION
* Containing the version the user wants to select
* NACHET_BLOB_PIPELINE_DECRYPTION_KEY
* The key to decrypt sensible data such as the API key and the endpoint of a model.

#### In the code
In the backend, the pipelines are retrieved using the `get_pipelines` function. This function retrieved the data from the blob storage and stored the pipeline in the `CACHE["endpoint"]` variable. This the variable that feed the frontend the `models` information and metadata.

```python
async def get_pipeline(mock:bool = False):
"""
Retrieves the pipelines from the Azure storage API.
Parameters:
- mock (bool): If True, retrieves the pipelines from a mock JSON file. If False, retrieves the pipelines from the Azure storage API.
Returns:
- list: A list of dictionaries representing the pipelines.
"""
if mock:
with open("mock_pipeline_json.json", "r+") as f:
result_json = json.load(f)
else:
result_json = await azure_storage_api.get_pipeline_info(connection_string, PIPELINE_BLOB_NAME, PIPELINE_VERSION)
cipher_suite = Fernet(FERNET_KEY)
# Get all the api_call function and map them in a dictionary
api_call_function = {func.split("from_")[1]: getattr(model_module, func) for func in dir(model_module) if "inference" in func.split("_")}
# Get all the inference functions and map them in a dictionary
inference_functions = {func: getattr(inference, func) for func in dir(inference) if "process" in func.split("_")}

models = ()
for model in result_json.get("models"):
m = Model(
api_call_function.get(model.get("api_call_function")),
model.get("model_name"),
cipher_suite.decrypt(model.get("endpoint").encode()).decode(),
cipher_suite.decrypt(model.get("api_key").encode()).decode(),
inference_functions.get(model.get("inference_function")),
model.get("content-type"),
model.get("deployment_platform")
)
models += (m,)

# Build the pipeline to call the models in order in the inference request
for pipeline in result_json.get("pipelines"):
CACHE["pipelines"][pipeline.get("pipeline_name")] = tuple([m for m in models if m.name in pipeline.get("models")])

return result_json.get("pipelines")

```

### Available Version of the JSON file:
|Version|Creation Date| Pipelines
--|--|--
0.1.0 | 2024-02-26 | Swin Transformer and 6 Seeds Detector
2 changes: 1 addition & 1 deletion model_inference/model_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async def request_inference_from_nachet_6seeds(model: namedtuple, previous_resul
result = response.read()
result_json = json.loads(result.decode("utf8"))

return result_json[0],
return result_json

except Exception as e:
raise InferenceRequestError(f"An error occurred while processing the request:\n {str(e)}")
Expand Down

0 comments on commit 4f3166c

Please sign in to comment.