diff --git a/src/backend/app/projects/image_processing.py b/src/backend/app/projects/image_processing.py index 5c63eeb5..e5fc7be0 100644 --- a/src/backend/app/projects/image_processing.py +++ b/src/backend/app/projects/image_processing.py @@ -11,6 +11,7 @@ from concurrent.futures import ThreadPoolExecutor from psycopg import Connection from asgiref.sync import async_to_sync +from app.config import settings class DroneImageProcessor: @@ -149,7 +150,9 @@ def process_images_from_s3(self, bucket_name, name=None, options=[], webhook=Non images_list = self.list_images(temp_dir) # Start a new processing task - task = self.process_new_task(images_list, name=name, options=options) + task = self.process_new_task( + images_list, name=name, options=options, webhook=webhook + ) # If webhook is passed, webhook does this job. if not webhook: @@ -186,3 +189,46 @@ def process_images_from_s3(self, bucket_name, name=None, options=[], webhook=Non # Clean up temporary directory shutil.rmtree(temp_dir) pass + + +def download_and_upload_assets_from_odm_to_s3( + node_odm_url: str, task_id: str, dtm_project_id: uuid.UUID, dtm_task_id: uuid.UUID +): + """ + Downloads results from ODM and uploads them to S3 (Minio). + + :param task_id: UUID of the ODM task. + :param dtm_project_id: UUID of the project. + :param dtm_task_id: UUID of the task. + """ + log.info(f"Starting download for task {task_id}") + + # Replace with actual ODM node details and URL + node = Node.from_url(node_odm_url) + + try: + # Get the task object using the task_id + task = node.get_task(task_id) + + # Create a temporary directory to store the results + output_file_path = f"/tmp/{dtm_project_id}" + + log.info(f"Downloading results for task {task_id} to {output_file_path}") + + # Download results as a zip file + assets_path = task.download_zip(output_file_path) + + # Upload the results into S3 (Minio) + s3_path = f"projects/{dtm_project_id}/{dtm_task_id}/assets.zip" + log.info(f"Uploading {output_file_path} to S3 path: {s3_path}") + add_file_to_bucket(settings.S3_BUCKET_NAME, assets_path, s3_path) + + log.info(f"Assets for task {task_id} successfully uploaded to S3.") + + except Exception as e: + log.error(f"Error downloading or uploading assets for task {task_id}: {e}") + + finally: + # Clean up the temporary directory + shutil.rmtree(output_file_path) + log.info(f"Temporary directory {output_file_path} cleaned up.") diff --git a/src/backend/app/projects/project_logic.py b/src/backend/app/projects/project_logic.py index 7742a635..a750c2ca 100644 --- a/src/backend/app/projects/project_logic.py +++ b/src/backend/app/projects/project_logic.py @@ -180,7 +180,7 @@ def process_drone_images( ] webhook_url = ( - f"{settings.BACKEND_URL}/api/projects/odm/webhook/{project_id}/{task_id}" + f"{settings.BACKEND_URL}/api/projects/odm/webhook/{project_id}/{task_id}/" ) processor.process_images_from_s3( diff --git a/src/backend/app/projects/project_routes.py b/src/backend/app/projects/project_routes.py index 30c70dbd..3027e484 100644 --- a/src/backend/app/projects/project_routes.py +++ b/src/backend/app/projects/project_routes.py @@ -17,13 +17,14 @@ Form, Response, BackgroundTasks, + Request, ) from geojson_pydantic import FeatureCollection from loguru import logger as log from psycopg import Connection from shapely.geometry import shape, mapping from shapely.ops import unary_union -from app.projects import project_schemas, project_deps, project_logic +from app.projects import project_schemas, project_deps, project_logic, image_processing from app.db import database from app.models.enums import HTTPStatus, State from app.s3 import s3_client @@ -182,9 +183,11 @@ async def create_project( if dem else None ) - await project_logic.upload_file_to_s3( - project_id, image, "map_screenshot.png" - ) if image else None + ( + await project_logic.upload_file_to_s3(project_id, image, "map_screenshot.png") + if image + else None + ) # Update DEM and Image URLs in the database await project_logic.update_url(db, project_id, dem_url) @@ -421,22 +424,46 @@ async def get_assets_info( return project_logic.get_project_info_from_s3(project.id, task_id) -@router.get("/odm/webhook/{project_id}/{task_id}", tags=["Image Processing"]) -async def odm_webhook(project_id: uuid.UUID, task_id: uuid.UUID): +@router.post( + "/odm/webhook/{dtm_project_id}/{dtm_task_id}/", + tags=["Image Processing"], +) +async def odm_webhook( + request: Request, + dtm_project_id: uuid.UUID, + dtm_task_id: uuid.UUID, + background_tasks: BackgroundTasks, +): """ Webhook to receive notifications from ODM processing tasks. """ + # Try to parse the JSON body try: - log.info("Received ODM webhook") - - # Log the received task ID and status - log.info(f"Task ID: {task_id}, Status: .......") - - # You can add logic here to update your system based on task status - # For example: updating a database, sending a notification, etc. - - return {"message": "Webhook received", "task_id": task_id, "status": "status"} - + payload = await request.json() except Exception as e: - log.error(f"Error processing webhook: {e}") - raise HTTPException(status_code=500, detail="Internal server error") + log.error(f"Error parsing JSON: {e}") + raise HTTPException(status_code=400, detail="Invalid JSON body") + + task_id = payload.get("uuid") + status = payload.get("status") + + if not task_id or not status: + raise HTTPException(status_code=400, detail="Invalid webhook payload") + + log.info(f"Task ID: {task_id}, Status: {status}") + + # If status is 'success', download and upload assets to S3. + # 40 is the status code for success in odm + if status["code"] == 40: + # Call function to download assets from ODM and upload to S3 + background_tasks.add_task( + image_processing.download_and_upload_assets_from_odm_to_s3, + settings.NODE_ODM_URL, + task_id, + dtm_project_id, + dtm_task_id, + ) + elif status["code"] == 30: + # failed task + log.error(f'ODM task {task_id} failed: {status["errorMessage"]}') + return {"message": "Webhook received", "task_id": task_id}