Skip to content

Commit

Permalink
Merge pull request #294 from hotosm/upload_to_oam
Browse files Browse the repository at this point in the history
Uses ODM Webhook for Post Processing tasks in Image Processing
  • Loading branch information
nrjadkry authored Oct 15, 2024
2 parents 156e190 + a8aa5c4 commit 3a2799d
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 20 deletions.
48 changes: 47 additions & 1 deletion src/backend/app/projects/image_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from concurrent.futures import ThreadPoolExecutor
from psycopg import Connection
from asgiref.sync import async_to_sync
from app.config import settings


class DroneImageProcessor:
Expand Down Expand Up @@ -149,7 +150,9 @@ def process_images_from_s3(self, bucket_name, name=None, options=[], webhook=Non
images_list = self.list_images(temp_dir)

# Start a new processing task
task = self.process_new_task(images_list, name=name, options=options)
task = self.process_new_task(
images_list, name=name, options=options, webhook=webhook
)

# If webhook is passed, webhook does this job.
if not webhook:
Expand Down Expand Up @@ -186,3 +189,46 @@ def process_images_from_s3(self, bucket_name, name=None, options=[], webhook=Non
# Clean up temporary directory
shutil.rmtree(temp_dir)
pass


def download_and_upload_assets_from_odm_to_s3(
node_odm_url: str, task_id: str, dtm_project_id: uuid.UUID, dtm_task_id: uuid.UUID
):
"""
Downloads results from ODM and uploads them to S3 (Minio).
:param task_id: UUID of the ODM task.
:param dtm_project_id: UUID of the project.
:param dtm_task_id: UUID of the task.
"""
log.info(f"Starting download for task {task_id}")

# Replace with actual ODM node details and URL
node = Node.from_url(node_odm_url)

try:
# Get the task object using the task_id
task = node.get_task(task_id)

# Create a temporary directory to store the results
output_file_path = f"/tmp/{dtm_project_id}"

log.info(f"Downloading results for task {task_id} to {output_file_path}")

# Download results as a zip file
assets_path = task.download_zip(output_file_path)

# Upload the results into S3 (Minio)
s3_path = f"projects/{dtm_project_id}/{dtm_task_id}/assets.zip"
log.info(f"Uploading {output_file_path} to S3 path: {s3_path}")
add_file_to_bucket(settings.S3_BUCKET_NAME, assets_path, s3_path)

log.info(f"Assets for task {task_id} successfully uploaded to S3.")

except Exception as e:
log.error(f"Error downloading or uploading assets for task {task_id}: {e}")

finally:
# Clean up the temporary directory
shutil.rmtree(output_file_path)
log.info(f"Temporary directory {output_file_path} cleaned up.")
2 changes: 1 addition & 1 deletion src/backend/app/projects/project_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def process_drone_images(
]

webhook_url = (
f"{settings.BACKEND_URL}/api/projects/odm/webhook/{project_id}/{task_id}"
f"{settings.BACKEND_URL}/api/projects/odm/webhook/{project_id}/{task_id}/"
)

processor.process_images_from_s3(
Expand Down
63 changes: 45 additions & 18 deletions src/backend/app/projects/project_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
Form,
Response,
BackgroundTasks,
Request,
)
from geojson_pydantic import FeatureCollection
from loguru import logger as log
from psycopg import Connection
from shapely.geometry import shape, mapping
from shapely.ops import unary_union
from app.projects import project_schemas, project_deps, project_logic
from app.projects import project_schemas, project_deps, project_logic, image_processing
from app.db import database
from app.models.enums import HTTPStatus, State
from app.s3 import s3_client
Expand Down Expand Up @@ -182,9 +183,11 @@ async def create_project(
if dem
else None
)
await project_logic.upload_file_to_s3(
project_id, image, "map_screenshot.png"
) if image else None
(
await project_logic.upload_file_to_s3(project_id, image, "map_screenshot.png")
if image
else None
)

# Update DEM and Image URLs in the database
await project_logic.update_url(db, project_id, dem_url)
Expand Down Expand Up @@ -421,22 +424,46 @@ async def get_assets_info(
return project_logic.get_project_info_from_s3(project.id, task_id)


@router.get("/odm/webhook/{project_id}/{task_id}", tags=["Image Processing"])
async def odm_webhook(project_id: uuid.UUID, task_id: uuid.UUID):
@router.post(
"/odm/webhook/{dtm_project_id}/{dtm_task_id}/",
tags=["Image Processing"],
)
async def odm_webhook(
request: Request,
dtm_project_id: uuid.UUID,
dtm_task_id: uuid.UUID,
background_tasks: BackgroundTasks,
):
"""
Webhook to receive notifications from ODM processing tasks.
"""
# Try to parse the JSON body
try:
log.info("Received ODM webhook")

# Log the received task ID and status
log.info(f"Task ID: {task_id}, Status: .......")

# You can add logic here to update your system based on task status
# For example: updating a database, sending a notification, etc.

return {"message": "Webhook received", "task_id": task_id, "status": "status"}

payload = await request.json()
except Exception as e:
log.error(f"Error processing webhook: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
log.error(f"Error parsing JSON: {e}")
raise HTTPException(status_code=400, detail="Invalid JSON body")

task_id = payload.get("uuid")
status = payload.get("status")

if not task_id or not status:
raise HTTPException(status_code=400, detail="Invalid webhook payload")

log.info(f"Task ID: {task_id}, Status: {status}")

# If status is 'success', download and upload assets to S3.
# 40 is the status code for success in odm
if status["code"] == 40:
# Call function to download assets from ODM and upload to S3
background_tasks.add_task(
image_processing.download_and_upload_assets_from_odm_to_s3,
settings.NODE_ODM_URL,
task_id,
dtm_project_id,
dtm_task_id,
)
elif status["code"] == 30:
# failed task
log.error(f'ODM task {task_id} failed: {status["errorMessage"]}')
return {"message": "Webhook received", "task_id": task_id}

0 comments on commit 3a2799d

Please sign in to comment.