Skip to content

Commit

Permalink
Merge pull request #90 from ai-cfia/sylvanie85/issue85
Browse files Browse the repository at this point in the history
Sylvanie85/issue85
  • Loading branch information
sylvanie85 authored Jun 6, 2024
2 parents 4d641f9 + 692e614 commit ed6ca08
Show file tree
Hide file tree
Showing 14 changed files with 469 additions and 110 deletions.
5 changes: 5 additions & 0 deletions .env.template
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
NACHET_AZURE_STORAGE_CONNECTION_STRING=
NACHET_STORAGE_URL=
NACHET_DB_URL=
NACHET_SCHEMA=
NACHET_DATA=
NACHET_BLOB_PIPELINE_NAME=
NACHET_BLOB_PIPELINE_VERSION=
NACHET_BLOB_PIPELINE_DECRYPTION_KEY=
NACHET_BLOB_ACCOUNT=
NACHET_BLOB_KEY=
NACHET_MAX_CONTENT_LENGTH=
NACHET_VALID_EXTENSION=
NACHET_VALID_DIMENSION=
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ If you want to run the program as a Docker container (e.g., for production), use

```bash
docker build -t nachet-backend .
docker run -p 8080:8080 -v $(pwd):/app nachet-backend
docker run -p 8080:8080 -e PORT=8080 -v $(pwd):/app nachet-backend
```

### TESTING NACHET-BACKEND
Expand Down
197 changes: 150 additions & 47 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
from collections import namedtuple
from cryptography.fernet import Fernet

import azure_storage.azure_storage_api as azure_storage_api
from azure.core.exceptions import ResourceNotFoundError, ServiceResponseError
import model.inference as inference
from model import request_function
load_dotenv() # noqa: E402

import model.inference as inference # noqa: E402
import storage.datastore_storage_api as datastore # noqa: E402
from azure.core.exceptions import ResourceNotFoundError, ServiceResponseError # noqa: E402
from model import request_function # noqa: E402
from datastore import azure_storage # noqa: E402


class APIErrors(Exception):
pass
Expand Down Expand Up @@ -49,6 +53,18 @@ class ImageValidationError(APIErrors):
pass


class ValidateEnvVariablesError(APIErrors):
pass


class EmailNotSendError(APIErrors):
pass


class EmptyPictureSetError(APIErrors):
pass


class APIWarnings(UserWarning):
pass

Expand All @@ -60,7 +76,6 @@ class ImageWarning(APIWarnings):
class MaxContentLengthWarning(APIWarnings):
pass

load_dotenv()

connection_string_regex = r"^DefaultEndpointsProtocol=https?;.*;FileEndpoint=https://[a-zA-Z0-9]+\.file\.core\.windows\.net/;$"
pipeline_version_regex = r"\d.\d.\d"
Expand Down Expand Up @@ -149,12 +164,10 @@ async def before_serving():
if not bool(re.match(pipeline_version_regex, PIPELINE_VERSION)):
raise ServerError("Incorrect environment variable: PIPELINE_VERSION")

CACHE["seeds"] = await fetch_json(NACHET_DATA, "seeds", "seeds/all.json")
CACHE["endpoints"] = await get_pipelines(
CONNECTION_STRING, PIPELINE_BLOB_NAME,
PIPELINE_VERSION, Fernet(FERNET_KEY)
)

# Store the seeds names and ml structure in CACHE
CACHE["seeds"] = datastore.get_all_seeds_names()
CACHE["endpoints"] = await get_pipelines()

print(
f"""Server start with current configuration:\n
date: {date.today()}
Expand All @@ -168,6 +181,23 @@ async def before_serving():
raise


@app.get("/get-user-id")
async def get_user_id() :
"""
Returns the user id
"""
try:
data = await request.get_json()
email = data["email"]

user_id = datastore.get_user_id(email)

return jsonify(user_id), 200
except (KeyError, TypeError, ValueError, datastore.DatastoreError) as error:
print(error)
return jsonify([f"GetUserIdError: {str(error)}"]), 400


@app.post("/del")
async def delete_directory():
"""
Expand All @@ -178,11 +208,11 @@ async def delete_directory():
container_name = data["container_name"]
folder_name = data["folder_name"]
if container_name and folder_name:
container_client = await azure_storage_api.mount_container(
app.config["BLOB_CLIENT"], container_name, create_container=False
container_client = await azure_storage.mount_container(
CONNECTION_STRING, container_name, create_container=True
)
if container_client:
folder_uuid = await azure_storage_api.get_folder_uuid(
folder_uuid = await azure_storage.get_folder_uuid(
container_client, folder_name
)
if folder_uuid:
Expand All @@ -198,7 +228,7 @@ async def delete_directory():
else:
raise DeleteDirectoryRequestError("missing container or directory name")

except (KeyError, TypeError, azure_storage_api.MountContainerError, ResourceNotFoundError, DeleteDirectoryRequestError, ServiceResponseError) as error:
except (KeyError, TypeError, azure_storage.MountContainerError, ResourceNotFoundError, DeleteDirectoryRequestError, ServiceResponseError) as error:
print(error)
return jsonify([f"DeleteDirectoryRequestError: {str(error)}"]), 400

Expand All @@ -212,15 +242,15 @@ async def list_directories():
data = await request.get_json()
container_name = data["container_name"]
if container_name:
container_client = await azure_storage_api.mount_container(
app.config["BLOB_CLIENT"], container_name, create_container=True
container_client = await azure_storage.mount_container(
CONNECTION_STRING, container_name, create_container=True
)
response = await azure_storage_api.get_directories(container_client)
response = await azure_storage.get_directories(container_client)
return jsonify(response), 200
else:
raise ListDirectoriesRequestError("Missing container name")

except (KeyError, TypeError, ListDirectoriesRequestError, azure_storage_api.MountContainerError) as error:
except (KeyError, TypeError, ListDirectoriesRequestError, azure_storage.MountContainerError) as error:
print(error)
return jsonify([f"ListDirectoriesRequestError: {str(error)}"]), 400

Expand All @@ -235,10 +265,10 @@ async def create_directory():
container_name = data["container_name"]
folder_name = data["folder_name"]
if container_name and folder_name:
container_client = await azure_storage_api.mount_container(
app.config["BLOB_CLIENT"], container_name, create_container=False
container_client = await azure_storage.mount_container(
CONNECTION_STRING, container_name, create_container=True
)
response = await azure_storage_api.create_folder(
response = await azure_storage.create_folder(
container_client, folder_name
)
if response:
Expand All @@ -248,7 +278,7 @@ async def create_directory():
else:
raise CreateDirectoryRequestError("missing container or directory name")

except (KeyError, TypeError, CreateDirectoryRequestError, azure_storage_api.MountContainerError) as error:
except (KeyError, TypeError, CreateDirectoryRequestError, azure_storage.MountContainerError) as error:
print(error)
return jsonify([f"CreateDirectoryRequestError: {str(error)}"]), 400

Expand Down Expand Up @@ -298,7 +328,7 @@ async def image_validation():
if header.lower() != expected_header:
raise ImageValidationError(f"invalid file header: {header}")

validator = await azure_storage_api.generate_hash(image_bytes)
validator = await azure_storage.generate_hash(image_bytes)
CACHE['validators'].append(validator)

return jsonify([validator]), 200
Expand All @@ -325,13 +355,13 @@ async def inference_request():
container_name = data["container_name"]
imageDims = data["imageDims"]
image_base64 = data["image"]

user_id = data["userId"]

area_ratio = data.get("area_ratio", 0.5)
color_format = data.get("color_format", "hex")

print(f"Requested by user: {container_name}") # TODO: Transform into logging
pipelines_endpoints = CACHE.get("pipelines")
blob_service_client = app.config.get("BLOB_CLIENT")
validators = CACHE.get("validators")

if not (folder_name and container_name and imageDims and image_base64):
Expand All @@ -352,14 +382,21 @@ async def inference_request():
cache_json_result = [encoded_data]
image_bytes = base64.b64decode(encoded_data)

container_client = await azure_storage_api.mount_container(
blob_service_client, container_name, create_container=True
container_client = await azure_storage.mount_container(
CONNECTION_STRING, container_name, create_container=True
)
hash_value = await azure_storage_api.generate_hash(image_bytes)
await azure_storage_api.upload_image(
container_client, folder_name, image_bytes, hash_value

# Open db connection
connection = datastore.get_connection()
cursor = datastore.get_cursor(connection)

image_hash_value = await azure_storage.generate_hash(image_bytes)
picture_id = await datastore.get_picture_id(
cursor, user_id, image_hash_value, container_client
)

# Close connection
datastore.end_query(connection, cursor)

pipeline = pipelines_endpoints.get(pipeline_name)

for idx, model in enumerate(pipeline):
Expand All @@ -378,21 +415,30 @@ async def inference_request():

# upload the inference results to the user's container as async task
app.add_background_task(
azure_storage_api.upload_inference_result,
azure_storage.upload_inference_result,
container_client,
folder_name,
result_json_string,
hash_value,
image_hash_value,
)

# Open db connection
connection = datastore.get_connection()
cursor = datastore.get_cursor(connection)

saved_result_json = await datastore.save_inference_result(cursor, user_id, processed_result_json[0], picture_id, pipeline_name, 1)

# Close connection
datastore.end_query(connection, cursor)

# return the inference results to the client
print(f"Took: {'{:10.4f}'.format(time.perf_counter() - seconds)} seconds") # TODO: Transform into logging
return jsonify(processed_result_json), 200
return jsonify(saved_result_json), 200

except (inference.ModelAPIErrors, KeyError, TypeError, ValueError, InferenceRequestError, azure_storage_api.MountContainerError) as error:
except (inference.ModelAPIErrors, KeyError, TypeError, ValueError, InferenceRequestError, azure_storage.MountContainerError) as error:
print(error)
return jsonify(["InferenceRequestError: " + error.args[0]]), 400


@app.get("/seed-data/<seed_name>")
async def get_seed_data(seed_name):
"""
Expand Down Expand Up @@ -428,6 +474,69 @@ async def get_model_endpoints_metadata():
return jsonify("Error retrieving model endpoints metadata.", 404)


@app.get("/seeds")
async def get_seeds():
"""
Returns JSON containing the model seeds metadata
"""
seeds = await datastore.get_all_seeds()
if seeds :
return jsonify(seeds), 200
else:
return jsonify("Error retrieving seeds", 404)


@app.post("/feedback-positive")
async def feedback_positive():
"""
Receives inference feedback from the user and stores it in the database.
--> Perfect Inference Feedback :
- send the user_id and the inference_id to the datastore so the inference will be verified and not modified
Params :
- user_id : the user id that send the feedback
- inference_id : the inference id that the user want to modify
- boxes_id : the boxes id that the user want to modify
"""
try:
data = await request.get_json()
user_id = data["userId"]
inference_id = data["inferenceId"]
boxes_id = data["boxes"][0]
if inference_id and user_id and boxes_id:
await datastore.save_perfect_feedback(inference_id, user_id, boxes_id)
return jsonify([True]), 200
else:
raise APIErrors("missing argument(s)")
except (KeyError, TypeError, APIErrors) as error:
return jsonify([f"APIErrors while sending the inference feedback: {str(error)}"]), 400

@app.post("/feedback-negative")
async def feedback_negative():
"""
Receives inference feedback from the user and stores it in the database.
--> Annoted Inference Feedback :
- send the user_id and the inference_id to the datastore so the inference will be verified
- also send the feedback to the datastore to modified the inference
Params :
- inference_feedback : correction of the inference from the user if not a perfect inference
- user_id : the user id that send the feedback
- inference_id : the inference id that the user want to modify
- boxes_id : the boxes id that the user want to modify
"""
try:
data = await request.get_json()
inference_feedback = data["inferenceFeedback"]
user_id = data["userId"]
inference_id = data["inferenceId"]
boxes_id = data["boxes"][0]
if inference_id and user_id and boxes_id and inference_feedback :
await datastore.save_annoted_feedback(inference_id, user_id, boxes_id, inference_feedback)
else:
raise APIErrors("missing argument(s)")
except (KeyError, TypeError, APIErrors) as error:
return jsonify([f"APIErrors while sending the inference feedback: {str(error)}"]), 400

@app.get("/health")
async def health():
return "ok", 200
Expand Down Expand Up @@ -457,13 +566,12 @@ async def test():

return CACHE["endpoints"], 200


async def record_model(pipeline: namedtuple, result: list):
new_entry = [{"name": model.name, "version": model.version} for model in pipeline]
result[0]["models"] = new_entry
return json.dumps(result, indent=4)


async def fetch_json(repo_URL, key, file_path):
"""
Fetches JSON document from a GitHub repository.
Expand All @@ -484,24 +592,19 @@ async def fetch_json(repo_URL, key, file_path):
return result_json


async def get_pipelines(connection_string, pipeline_blob_name, pipeline_version, cipher_suite):
async def get_pipelines(cipher_suite=Fernet(FERNET_KEY)):
"""
Retrieves the pipelines from the Azure storage API.
Returns:
- list: A list of dictionaries representing the pipelines.
"""
try:
app.config["BLOB_CLIENT"] = await azure_storage_api.get_blob_client(connection_string)
result_json = await azure_storage_api.get_pipeline_info(app.config["BLOB_CLIENT"], pipeline_blob_name, pipeline_version)
except (azure_storage_api.AzureAPIErrors) as error:
print(error)
raise ServerError("server errror: could not retrieve the pipelines") from error
result_json = await datastore.get_pipelines()

models = ()
for model in result_json.get("models"):
m = Model(
request_function.get(model.get("endpoint_name")),
request_function.get(model.get("model_name")),
model.get("model_name"),
model.get("version"),
# To protect sensible data (API key and model endpoint), we encrypt it when
Expand Down
Loading

0 comments on commit ed6ca08

Please sign in to comment.