Skip to content

Commit

Permalink
Merge pull request #295 from hotosm/develop
Browse files Browse the repository at this point in the history
Uses ODM Webhook for post image processing tasks
  • Loading branch information
nrjadkry authored Oct 15, 2024
2 parents 07f8ebf + ac122b4 commit 33e5ac8
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 33 deletions.
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ GOOGLE_CLIENT_SECRET=${GOOGLE_CLIENT_SECRET:-GOOGLE_CLIENT_SECRET_GOES_HERE}
GOOGLE_LOGIN_REDIRECT_URI=${GOOGLE_LOGIN_REDIRECT_URI:-http://localhost:3040}
SECRET_KEY=${SECRET_KEY:-SUPERSECRETKEY__xxxxxxxyyyyyyyyyzzzzzzz}
EXTRA_CORS_ORIGINS=${EXTRA_CORS_ORIGINS:-["http://localhost:3040"]}
BACKEND_URL=${BACKEND_URL:-http://localhost:8000}
FRONTEND_URL=${FRONTEND_URL:-http://localhost:3040}
DEBUG=${DEBUG:-True}



## SMTP CONFIG ##
SMTP_TLS=${SMTP_TLS:-True}
SMTP_SSL=${SMTP_SSL:-False}
Expand Down
2 changes: 2 additions & 0 deletions src/backend/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def assemble_db_connection(cls, v: Optional[str], info: ValidationInfo) -> Any:
return pg_url

FRONTEND_URL: str = "http://localhost:3040"
BACKEND_URL: str = "http://localhost:8000"

S3_ENDPOINT: str = "http://s3:9000"
S3_ACCESS_KEY: Optional[str] = ""
S3_SECRET_KEY: Optional[str] = ""
Expand Down
111 changes: 83 additions & 28 deletions src/backend/app/projects/image_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from concurrent.futures import ThreadPoolExecutor
from psycopg import Connection
from asgiref.sync import async_to_sync
from app.config import settings


class DroneImageProcessor:
Expand Down Expand Up @@ -85,7 +86,9 @@ def list_images(self, directory):
images.append(str(file))
return images

def process_new_task(self, images, name=None, options=[], progress_callback=None):
def process_new_task(
self, images, name=None, options=[], progress_callback=None, webhook=None
):
"""
Sends a set of images via the API to start processing.
Expand All @@ -100,7 +103,9 @@ def process_new_task(self, images, name=None, options=[], progress_callback=None
# FIXME: take this from the function above
opts = {"dsm": True}

task = self.node.create_task(images, opts, name, progress_callback)
task = self.node.create_task(
images, opts, name, progress_callback, webhook=webhook
)
return task

def monitor_task(self, task):
Expand All @@ -126,7 +131,7 @@ def download_results(self, task, output_path):
log.info("Download completed.")
return path

def process_images_from_s3(self, bucket_name, name=None, options=[]):
def process_images_from_s3(self, bucket_name, name=None, options=[], webhook=None):
"""
Processes images from MinIO storage.
Expand All @@ -145,35 +150,85 @@ def process_images_from_s3(self, bucket_name, name=None, options=[]):
images_list = self.list_images(temp_dir)

# Start a new processing task
task = self.process_new_task(images_list, name=name, options=options)
# Monitor task progress
self.monitor_task(task)

# Optionally, download results
output_file_path = f"/tmp/{self.project_id}"
path_to_download = self.download_results(task, output_path=output_file_path)

# Upload the results into s3
s3_path = f"projects/{self.project_id}/{self.task_id}/assets.zip"
add_file_to_bucket(bucket_name, path_to_download, s3_path)
# now update the task as completed in Db.
# Call the async function using asyncio

# Update background task status to COMPLETED
update_task_status_sync = async_to_sync(task_logic.update_task_state)
update_task_status_sync(
self.db,
self.project_id,
self.task_id,
self.user_id,
"Task completed.",
State.IMAGE_UPLOADED,
State.IMAGE_PROCESSED,
timestamp(),
task = self.process_new_task(
images_list, name=name, options=options, webhook=webhook
)

# If webhook is passed, webhook does this job.
if not webhook:
# Monitor task progress
self.monitor_task(task)

# Optionally, download results
output_file_path = f"/tmp/{self.project_id}"
path_to_download = self.download_results(
task, output_path=output_file_path
)

# Upload the results into s3
s3_path = f"projects/{self.project_id}/{self.task_id}/assets.zip"
add_file_to_bucket(bucket_name, path_to_download, s3_path)
# now update the task as completed in Db.
# Call the async function using asyncio

# Update background task status to COMPLETED
update_task_status_sync = async_to_sync(task_logic.update_task_state)
update_task_status_sync(
self.db,
self.project_id,
self.task_id,
self.user_id,
"Task completed.",
State.IMAGE_UPLOADED,
State.IMAGE_PROCESSED,
timestamp(),
)
return task

finally:
# Clean up temporary directory
shutil.rmtree(temp_dir)
pass


def download_and_upload_assets_from_odm_to_s3(
node_odm_url: str, task_id: str, dtm_project_id: uuid.UUID, dtm_task_id: uuid.UUID
):
"""
Downloads results from ODM and uploads them to S3 (Minio).
:param task_id: UUID of the ODM task.
:param dtm_project_id: UUID of the project.
:param dtm_task_id: UUID of the task.
"""
log.info(f"Starting download for task {task_id}")

# Replace with actual ODM node details and URL
node = Node.from_url(node_odm_url)

try:
# Get the task object using the task_id
task = node.get_task(task_id)

# Create a temporary directory to store the results
output_file_path = f"/tmp/{dtm_project_id}"

log.info(f"Downloading results for task {task_id} to {output_file_path}")

# Download results as a zip file
assets_path = task.download_zip(output_file_path)

# Upload the results into S3 (Minio)
s3_path = f"projects/{dtm_project_id}/{dtm_task_id}/assets.zip"
log.info(f"Uploading {output_file_path} to S3 path: {s3_path}")
add_file_to_bucket(settings.S3_BUCKET_NAME, assets_path, s3_path)

log.info(f"Assets for task {task_id} successfully uploaded to S3.")

except Exception as e:
log.error(f"Error downloading or uploading assets for task {task_id}: {e}")

finally:
# Clean up the temporary directory
shutil.rmtree(output_file_path)
log.info(f"Temporary directory {output_file_path} cleaned up.")
9 changes: 8 additions & 1 deletion src/backend/app/projects/project_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,15 @@ def process_drone_images(
{"name": "orthophoto-resolution", "value": 5},
]

webhook_url = (
f"{settings.BACKEND_URL}/api/projects/odm/webhook/{project_id}/{task_id}/"
)

processor.process_images_from_s3(
settings.S3_BUCKET_NAME, name=f"DTM-Task-{task_id}", options=options
settings.S3_BUCKET_NAME,
name=f"DTM-Task-{task_id}",
options=options,
webhook=webhook_url,
)


Expand Down
56 changes: 52 additions & 4 deletions src/backend/app/projects/project_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
Form,
Response,
BackgroundTasks,
Request,
)
from geojson_pydantic import FeatureCollection
from loguru import logger as log
from psycopg import Connection
from shapely.geometry import shape, mapping
from shapely.ops import unary_union
from app.projects import project_schemas, project_deps, project_logic
from app.projects import project_schemas, project_deps, project_logic, image_processing
from app.db import database
from app.models.enums import HTTPStatus, State
from app.s3 import s3_client
Expand Down Expand Up @@ -182,9 +183,11 @@ async def create_project(
if dem
else None
)
await project_logic.upload_file_to_s3(
project_id, image, "map_screenshot.png"
) if image else None
(
await project_logic.upload_file_to_s3(project_id, image, "map_screenshot.png")
if image
else None
)

# Update DEM and Image URLs in the database
await project_logic.update_url(db, project_id, dem_url)
Expand Down Expand Up @@ -419,3 +422,48 @@ async def get_assets_info(
return results
else:
return project_logic.get_project_info_from_s3(project.id, task_id)


@router.post(
"/odm/webhook/{dtm_project_id}/{dtm_task_id}/",
tags=["Image Processing"],
)
async def odm_webhook(
request: Request,
dtm_project_id: uuid.UUID,
dtm_task_id: uuid.UUID,
background_tasks: BackgroundTasks,
):
"""
Webhook to receive notifications from ODM processing tasks.
"""
# Try to parse the JSON body
try:
payload = await request.json()
except Exception as e:
log.error(f"Error parsing JSON: {e}")
raise HTTPException(status_code=400, detail="Invalid JSON body")

task_id = payload.get("uuid")
status = payload.get("status")

if not task_id or not status:
raise HTTPException(status_code=400, detail="Invalid webhook payload")

log.info(f"Task ID: {task_id}, Status: {status}")

# If status is 'success', download and upload assets to S3.
# 40 is the status code for success in odm
if status["code"] == 40:
# Call function to download assets from ODM and upload to S3
background_tasks.add_task(
image_processing.download_and_upload_assets_from_odm_to_s3,
settings.NODE_ODM_URL,
task_id,
dtm_project_id,
dtm_task_id,
)
elif status["code"] == 30:
# failed task
log.error(f'ODM task {task_id} failed: {status["errorMessage"]}')
return {"message": "Webhook received", "task_id": task_id}

0 comments on commit 33e5ac8

Please sign in to comment.