Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload ODK submission user photos to S3 for easy access #1744

Merged
merged 11 commits into from
Aug 14, 2024
Merged
12 changes: 12 additions & 0 deletions src/backend/app/db/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,3 +551,15 @@ class DbTilesPath(Base):
tile_source = cast(str, Column(String))
background_task_id = cast(str, Column(String))
created_at = cast(datetime, Column(DateTime, default=timestamp))


class DbSubmissionPhotos(Base):
"""Keeping track of submission photos for a project."""

__tablename__ = "submission_photos"

id = cast(int, Column(Integer, primary_key=True))
project_id = cast(int, Column(Integer))
task_id = cast(int, Column(Integer))
submission_id = cast(str, Column(String))
s3_path = cast(str, Column(String))
29 changes: 29 additions & 0 deletions src/backend/app/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
from io import BytesIO
from typing import Any

from fastapi import HTTPException
from loguru import logger as log
from minio import Minio
from minio.commonconfig import CopySource
from minio.deleteobjects import DeleteObject
from minio.error import S3Error

from app.config import settings
from app.models.enums import HTTPStatus


def s3_client():
Expand Down Expand Up @@ -38,6 +41,32 @@ def s3_client():
# print(creds.secret_key)


def object_exists(bucket_name: str, s3_path: str) -> bool:
"""Check if an object exists in an S3 bucket using stat_object.

Args:
bucket_name (str): The name of the S3 bucket.
s3_path (str): The path of the object in the S3 bucket.

Returns:
bool: True if the object exists, False otherwise.
"""
client = s3_client()

try:
# stat_object will return metadata if the object exists
client.stat_object(bucket_name, s3_path)
return True
except S3Error as e:
if e.code == "NoSuchKey":
return False
else:
# Handle other exceptions
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail=str(e)
) from e


def add_file_to_bucket(bucket_name: str, file_path: str, s3_path: str):
"""Upload a file from the filesystem to an S3 bucket.

Expand Down
117 changes: 117 additions & 0 deletions src/backend/app/submissions/submission_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from loguru import logger as log

# from osm_fieldwork.json2osm import json2osm
from sqlalchemy import text
from sqlalchemy.orm import Session

from app.central.central_crud import (
Expand Down Expand Up @@ -565,3 +566,119 @@ async def get_submission_detail(
# all_features.append(feature)

# return geojson.FeatureCollection(features=all_features)


async def upload_attachment_to_s3(
project_id: int, instance_ids: list, background_task_id: uuid.UUID, db: Session
):
"""Uploads attachments to S3 for a given project and instance IDs.

Args:
project_id (int): The ID of the project.
instance_ids (list): List of instance IDs.
background_task_id (uuid.UUID): The ID of the background task.
db (Session): The database session.

Returns:
bool: True if the upload is successful.

Raises:
Exception: If an error occurs during the upload process.
"""
try:
project = await project_deps.get_project_by_id(db, project_id)
db_xform = await project_deps.get_project_xform(db, project_id)
odk_central = await project_deps.get_odk_credentials(db, project_id)
xform = get_odk_form(odk_central)
s3_bucket = settings.S3_BUCKET_NAME
s3_base_path = f"{project.organisation_id}/{project_id}"

# Fetch existing photos from the database
existing_photos = db.execute(
text("""
SELECT submission_id, s3_path
FROM submission_photos
WHERE project_id = :project_id
"""),
{"project_id": project_id},
).fetchall()

existing_photos_dict = {}
for submission_id, s3_path in existing_photos:
existing_photos_dict[submission_id] = s3_path

batch_insert_data = []
for instance_id in instance_ids:
submission_detail = await get_submission_detail(instance_id, project, db)
attachments = submission_detail["verification"]["image"]

if not isinstance(attachments, list):
attachments = [attachments]

for idx, filename in enumerate(attachments):
s3_key = f"{s3_base_path}/{instance_id}/{idx + 1}.jpeg"
img_url = f"{settings.S3_DOWNLOAD_ROOT}/{s3_bucket}/{s3_key}"

# Skip if the img_url already exists in the database
if img_url in existing_photos_dict.get(instance_id, []):
log.warning(
f"Image {img_url} for instance {instance_id} "
"already exists in DB. Skipping upload."
)
continue

try:
attachment = xform.getSubmissionPhoto(
project.odkid,
str(instance_id),
db_xform.odk_form_id,
str(filename),
)
if attachment:
image_stream = io.BytesIO(attachment)

# Upload the attachment to S3
add_obj_to_bucket(
s3_bucket, image_stream, s3_key, content_type="image/jpeg"
)

# Collect the data for batch insert
batch_insert_data.append(
{
"project_id": project_id,
"task_id": submission_detail["task_id"],
"submission_id": instance_id,
"s3_path": img_url,
}
)

except Exception as e:
log.warning(
f"Failed to process {filename} for instance {instance_id}: {e}"
)
continue

# Perform batch insert if there are new records to insert
if batch_insert_data:
sql = text("""
INSERT INTO submission_photos (
project_id,
task_id,
submission_id,
s3_path
)
VALUES (:project_id, :task_id, :submission_id, :s3_path)
""")
db.execute(sql, batch_insert_data)

db.commit()
return True

except Exception as e:
log.warning(str(e))
# Update background task status to FAILED
update_bg_task_sync = async_to_sync(
project_crud.update_background_task_status_in_database
)
update_bg_task_sync(db, background_task_id, 2, str(e))
return False
23 changes: 22 additions & 1 deletion src/backend/app/submissions/submission_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
"""Routes associated with data submission to and from ODK Central."""

import json
import uuid
from io import BytesIO
from typing import Annotated, Optional

import geojson
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query
from fastapi.concurrency import run_in_threadpool
from fastapi.responses import FileResponse, JSONResponse, Response
from loguru import logger as log
from sqlalchemy.orm import Session

from app.auth.auth_schemas import AuthUser, ProjectUserDict
Expand Down Expand Up @@ -333,8 +335,10 @@ async def get_submission_form_fields(

@router.get("/submission_table")
async def submission_table(
background_tasks: BackgroundTasks,
page: int = Query(1, ge=1),
results_per_page: int = Query(13, le=100),
background_task_id: Optional[uuid.UUID] = None,
task_id: Optional[int] = None,
submitted_by: Optional[str] = None,
review_state: Optional[str] = None,
Expand Down Expand Up @@ -379,6 +383,23 @@ async def submission_table(
data = await submission_crud.get_submission_by_project(project, filters, db)
count = data.get("@odata.count", 0)
submissions = data.get("value", [])
instance_ids = []
for submission in submissions:
if submission["__system"]["attachmentsPresent"] != 0:
instance_ids.append(submission["__id"])

if instance_ids:
background_task_id = await project_crud.insert_background_task_into_database(
db, "upload_submission_photos", project.id
)
log.info("uploading submission photos to s3")
background_tasks.add_task(
submission_crud.upload_attachment_to_s3,
project.id,
instance_ids,
background_task_id,
db,
)

if task_id:
submissions = [sub for sub in submissions if sub.get("task_id") == str(task_id)]
Expand Down
17 changes: 17 additions & 0 deletions src/backend/migrations/006-create-submission-photos-table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- ## Migration to:
-- * Create submission_photos table

-- Start a transaction
BEGIN;

CREATE TABLE IF NOT EXISTS submission_photos (
id SERIAL PRIMARY KEY,
project_id INTEGER NOT NULL,
task_id INTEGER NOT NULL,
submission_id VARCHAR NOT NULL,
s3_path VARCHAR NOT NULL
);
ALTER TABLE public.submission_photos OWNER TO fmtm;

-- Commit the transaction
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- ## Revert Migration to:
-- * Drop submission_photos table

-- Start a transaction
BEGIN;

DROP TABLE IF EXISTS public.submission_photos;

-- Commit the transaction
COMMIT;