diff --git a/.env.example b/.env.example index cc8f03f8..0ded43ef 100644 --- a/.env.example +++ b/.env.example @@ -22,8 +22,12 @@ GOOGLE_CLIENT_SECRET=${GOOGLE_CLIENT_SECRET:-GOOGLE_CLIENT_SECRET_GOES_HERE} GOOGLE_LOGIN_REDIRECT_URI=${GOOGLE_LOGIN_REDIRECT_URI:-http://localhost:3040} SECRET_KEY=${SECRET_KEY:-SUPERSECRETKEY__xxxxxxxyyyyyyyyyzzzzzzz} EXTRA_CORS_ORIGINS=${EXTRA_CORS_ORIGINS:-["http://localhost:3040"]} +BACKEND_URL=${BACKEND_URL:-http://localhost:8000} FRONTEND_URL=${FRONTEND_URL:-http://localhost:3040} DEBUG=${DEBUG:-True} + + + ## SMTP CONFIG ## SMTP_TLS=${SMTP_TLS:-True} SMTP_SSL=${SMTP_SSL:-False} diff --git a/src/backend/app/config.py b/src/backend/app/config.py index e795971b..2b5ce89c 100644 --- a/src/backend/app/config.py +++ b/src/backend/app/config.py @@ -81,6 +81,8 @@ def assemble_db_connection(cls, v: Optional[str], info: ValidationInfo) -> Any: return pg_url FRONTEND_URL: str = "http://localhost:3040" + BACKEND_URL: str = "http://localhost:8000" + S3_ENDPOINT: str = "http://s3:9000" S3_ACCESS_KEY: Optional[str] = "" S3_SECRET_KEY: Optional[str] = "" diff --git a/src/backend/app/projects/image_processing.py b/src/backend/app/projects/image_processing.py index 543df504..e5fc7be0 100644 --- a/src/backend/app/projects/image_processing.py +++ b/src/backend/app/projects/image_processing.py @@ -11,6 +11,7 @@ from concurrent.futures import ThreadPoolExecutor from psycopg import Connection from asgiref.sync import async_to_sync +from app.config import settings class DroneImageProcessor: @@ -85,7 +86,9 @@ def list_images(self, directory): images.append(str(file)) return images - def process_new_task(self, images, name=None, options=[], progress_callback=None): + def process_new_task( + self, images, name=None, options=[], progress_callback=None, webhook=None + ): """ Sends a set of images via the API to start processing. @@ -100,7 +103,9 @@ def process_new_task(self, images, name=None, options=[], progress_callback=None # FIXME: take this from the function above opts = {"dsm": True} - task = self.node.create_task(images, opts, name, progress_callback) + task = self.node.create_task( + images, opts, name, progress_callback, webhook=webhook + ) return task def monitor_task(self, task): @@ -126,7 +131,7 @@ def download_results(self, task, output_path): log.info("Download completed.") return path - def process_images_from_s3(self, bucket_name, name=None, options=[]): + def process_images_from_s3(self, bucket_name, name=None, options=[], webhook=None): """ Processes images from MinIO storage. @@ -145,35 +150,85 @@ def process_images_from_s3(self, bucket_name, name=None, options=[]): images_list = self.list_images(temp_dir) # Start a new processing task - task = self.process_new_task(images_list, name=name, options=options) - # Monitor task progress - self.monitor_task(task) - - # Optionally, download results - output_file_path = f"/tmp/{self.project_id}" - path_to_download = self.download_results(task, output_path=output_file_path) - - # Upload the results into s3 - s3_path = f"projects/{self.project_id}/{self.task_id}/assets.zip" - add_file_to_bucket(bucket_name, path_to_download, s3_path) - # now update the task as completed in Db. - # Call the async function using asyncio - - # Update background task status to COMPLETED - update_task_status_sync = async_to_sync(task_logic.update_task_state) - update_task_status_sync( - self.db, - self.project_id, - self.task_id, - self.user_id, - "Task completed.", - State.IMAGE_UPLOADED, - State.IMAGE_PROCESSED, - timestamp(), + task = self.process_new_task( + images_list, name=name, options=options, webhook=webhook ) + + # If webhook is passed, webhook does this job. + if not webhook: + # Monitor task progress + self.monitor_task(task) + + # Optionally, download results + output_file_path = f"/tmp/{self.project_id}" + path_to_download = self.download_results( + task, output_path=output_file_path + ) + + # Upload the results into s3 + s3_path = f"projects/{self.project_id}/{self.task_id}/assets.zip" + add_file_to_bucket(bucket_name, path_to_download, s3_path) + # now update the task as completed in Db. + # Call the async function using asyncio + + # Update background task status to COMPLETED + update_task_status_sync = async_to_sync(task_logic.update_task_state) + update_task_status_sync( + self.db, + self.project_id, + self.task_id, + self.user_id, + "Task completed.", + State.IMAGE_UPLOADED, + State.IMAGE_PROCESSED, + timestamp(), + ) return task finally: # Clean up temporary directory shutil.rmtree(temp_dir) pass + + +def download_and_upload_assets_from_odm_to_s3( + node_odm_url: str, task_id: str, dtm_project_id: uuid.UUID, dtm_task_id: uuid.UUID +): + """ + Downloads results from ODM and uploads them to S3 (Minio). + + :param task_id: UUID of the ODM task. + :param dtm_project_id: UUID of the project. + :param dtm_task_id: UUID of the task. + """ + log.info(f"Starting download for task {task_id}") + + # Replace with actual ODM node details and URL + node = Node.from_url(node_odm_url) + + try: + # Get the task object using the task_id + task = node.get_task(task_id) + + # Create a temporary directory to store the results + output_file_path = f"/tmp/{dtm_project_id}" + + log.info(f"Downloading results for task {task_id} to {output_file_path}") + + # Download results as a zip file + assets_path = task.download_zip(output_file_path) + + # Upload the results into S3 (Minio) + s3_path = f"projects/{dtm_project_id}/{dtm_task_id}/assets.zip" + log.info(f"Uploading {output_file_path} to S3 path: {s3_path}") + add_file_to_bucket(settings.S3_BUCKET_NAME, assets_path, s3_path) + + log.info(f"Assets for task {task_id} successfully uploaded to S3.") + + except Exception as e: + log.error(f"Error downloading or uploading assets for task {task_id}: {e}") + + finally: + # Clean up the temporary directory + shutil.rmtree(output_file_path) + log.info(f"Temporary directory {output_file_path} cleaned up.") diff --git a/src/backend/app/projects/project_logic.py b/src/backend/app/projects/project_logic.py index d5a5dfc8..a750c2ca 100644 --- a/src/backend/app/projects/project_logic.py +++ b/src/backend/app/projects/project_logic.py @@ -179,8 +179,15 @@ def process_drone_images( {"name": "orthophoto-resolution", "value": 5}, ] + webhook_url = ( + f"{settings.BACKEND_URL}/api/projects/odm/webhook/{project_id}/{task_id}/" + ) + processor.process_images_from_s3( - settings.S3_BUCKET_NAME, name=f"DTM-Task-{task_id}", options=options + settings.S3_BUCKET_NAME, + name=f"DTM-Task-{task_id}", + options=options, + webhook=webhook_url, ) diff --git a/src/backend/app/projects/project_routes.py b/src/backend/app/projects/project_routes.py index 14ecc367..3027e484 100644 --- a/src/backend/app/projects/project_routes.py +++ b/src/backend/app/projects/project_routes.py @@ -17,13 +17,14 @@ Form, Response, BackgroundTasks, + Request, ) from geojson_pydantic import FeatureCollection from loguru import logger as log from psycopg import Connection from shapely.geometry import shape, mapping from shapely.ops import unary_union -from app.projects import project_schemas, project_deps, project_logic +from app.projects import project_schemas, project_deps, project_logic, image_processing from app.db import database from app.models.enums import HTTPStatus, State from app.s3 import s3_client @@ -182,9 +183,11 @@ async def create_project( if dem else None ) - await project_logic.upload_file_to_s3( - project_id, image, "map_screenshot.png" - ) if image else None + ( + await project_logic.upload_file_to_s3(project_id, image, "map_screenshot.png") + if image + else None + ) # Update DEM and Image URLs in the database await project_logic.update_url(db, project_id, dem_url) @@ -419,3 +422,48 @@ async def get_assets_info( return results else: return project_logic.get_project_info_from_s3(project.id, task_id) + + +@router.post( + "/odm/webhook/{dtm_project_id}/{dtm_task_id}/", + tags=["Image Processing"], +) +async def odm_webhook( + request: Request, + dtm_project_id: uuid.UUID, + dtm_task_id: uuid.UUID, + background_tasks: BackgroundTasks, +): + """ + Webhook to receive notifications from ODM processing tasks. + """ + # Try to parse the JSON body + try: + payload = await request.json() + except Exception as e: + log.error(f"Error parsing JSON: {e}") + raise HTTPException(status_code=400, detail="Invalid JSON body") + + task_id = payload.get("uuid") + status = payload.get("status") + + if not task_id or not status: + raise HTTPException(status_code=400, detail="Invalid webhook payload") + + log.info(f"Task ID: {task_id}, Status: {status}") + + # If status is 'success', download and upload assets to S3. + # 40 is the status code for success in odm + if status["code"] == 40: + # Call function to download assets from ODM and upload to S3 + background_tasks.add_task( + image_processing.download_and_upload_assets_from_odm_to_s3, + settings.NODE_ODM_URL, + task_id, + dtm_project_id, + dtm_task_id, + ) + elif status["code"] == 30: + # failed task + log.error(f'ODM task {task_id} failed: {status["errorMessage"]}') + return {"message": "Webhook received", "task_id": task_id}