Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Updated the task as completed after process from S3 #270

Merged
merged 5 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/backend/app/projects/image_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
import tempfile
import shutil
from pathlib import Path
from app.tasks import task_logic
from app.models.enums import State
from app.utils import timestamp
from pyodm import Node
from app.s3 import get_file_from_bucket, list_objects_from_bucket, add_file_to_bucket
from loguru import logger as log
from concurrent.futures import ThreadPoolExecutor
from psycopg import Connection


class DroneImageProcessor:
Expand All @@ -14,6 +18,8 @@ def __init__(
node_odm_url: str,
project_id: uuid.UUID,
task_id: uuid.UUID,
user_id: str,
db: Connection,
):
"""
Initializes the connection to the ODM node.
Expand All @@ -22,6 +28,8 @@ def __init__(
self.node = Node.from_url(node_odm_url)
self.project_id = project_id
self.task_id = task_id
self.user_id = user_id
self.db = db

def options_list_to_dict(self, options=[]):
"""
Expand Down Expand Up @@ -117,7 +125,7 @@ def download_results(self, task, output_path):
log.info("Download completed.")
return path

def process_images_from_s3(self, bucket_name, name=None, options=[]):
async def process_images_from_s3(self, bucket_name, name=None, options=[]):
"""
Processes images from MinIO storage.

Expand Down Expand Up @@ -147,6 +155,17 @@ def process_images_from_s3(self, bucket_name, name=None, options=[]):
# Upload the results into s3
s3_path = f"projects/{self.project_id}/{self.task_id}/assets.zip"
add_file_to_bucket(bucket_name, path_to_download, s3_path)
# now update the task as completed in Db.
await task_logic.update_task_state(
self.db,
self.project.id,
self.task_id,
self.user_id,
"Task completed.",
State.IMAGE_UPLOADED,
State.IMAGE_PROCESSED,
timestamp(),
)
return task

finally:
Expand Down
8 changes: 6 additions & 2 deletions src/backend/app/projects/project_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,13 @@ async def preview_split_by_square(boundary: str, meters: int):
)


def process_drone_images(project_id: uuid.UUID, task_id: uuid.UUID):
def process_drone_images(
project_id: uuid.UUID, task_id: uuid.UUID, user_id: str, db: Connection
):
# Initialize the processor
processor = DroneImageProcessor(settings.NODE_ODM_URL, project_id, task_id)
processor = DroneImageProcessor(
settings.NODE_ODM_URL, project_id, task_id, user_id, db
)

# Define processing options
options = [
Expand Down
22 changes: 19 additions & 3 deletions src/backend/app/projects/project_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uuid
from typing import Annotated, Optional
from uuid import UUID
from app.tasks import task_logic
import geojson
from datetime import timedelta
from fastapi import (
Expand All @@ -24,13 +25,13 @@
from shapely.ops import unary_union
from app.projects import project_schemas, project_deps, project_logic
from app.db import database
from app.models.enums import HTTPStatus
from app.models.enums import HTTPStatus, State
from app.s3 import s3_client
from app.config import settings
from app.users.user_deps import login_required
from app.users.user_schemas import AuthUser
from app.tasks import task_schemas
from app.utils import geojson_to_kml
from app.utils import geojson_to_kml, timestamp
from app.users import user_schemas


Expand Down Expand Up @@ -349,8 +350,23 @@ async def process_imagery(
],
user_data: Annotated[AuthUser, Depends(login_required)],
background_tasks: BackgroundTasks,
db: Annotated[Connection, Depends(database.get_db)],
):
background_tasks.add_task(project_logic.process_drone_images, project.id, task_id)
user_id = user_data.id
# TODO: Update task state to reflect completion of image uploads.
await task_logic.update_task_state(
db,
project.id,
task_id,
user_id,
"Task images upload completed.",
State.LOCKED_FOR_MAPPING,
State.IMAGE_UPLOADED,
timestamp(),
)
background_tasks.add_task(
project_logic.process_drone_images, project.id, task_id, user_id, db
)
return {"message": "Processing started"}


Expand Down
2 changes: 1 addition & 1 deletion src/backend/app/projects/project_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ async def one(db: Connection, project_id: uuid.UUID):
CASE
WHEN te.state = 'REQUEST_FOR_MAPPING' THEN 'request logs'
WHEN te.state = 'LOCKED_FOR_MAPPING' THEN 'ongoing'
WHEN te.state = 'UNLOCKED_DONE' THEN 'completed'
WHEN te.state = 'IMAGE_PROCESSED' THEN 'completed'
WHEN te.state = 'UNFLYABLE_TASK' THEN 'unflyable task'
ELSE 'UNLOCKED_TO_MAP'
END AS calculated_state
Expand Down
2 changes: 1 addition & 1 deletion src/backend/app/tasks/task_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def get_task_stats(
SELECT
COUNT(CASE WHEN te.state = 'REQUEST_FOR_MAPPING' THEN 1 END) AS request_logs,
COUNT(CASE WHEN te.state = 'LOCKED_FOR_MAPPING' THEN 1 END) AS ongoing_tasks,
COUNT(CASE WHEN te.state = 'UNLOCKED_DONE' THEN 1 END) AS completed_tasks,
COUNT(CASE WHEN te.state = 'IMAGE_PROCESSED' THEN 1 END) AS completed_tasks,
COUNT(CASE WHEN te.state = 'UNFLYABLE_TASK' THEN 1 END) AS unflyable_tasks
FROM (
SELECT DISTINCT ON (te.task_id)
Expand Down
2 changes: 1 addition & 1 deletion src/backend/app/tasks/task_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ async def get_tasks_by_user(
CASE
WHEN task_events.state = 'REQUEST_FOR_MAPPING' THEN 'request logs'
WHEN task_events.state = 'LOCKED_FOR_MAPPING' THEN 'ongoing'
WHEN task_events.state = 'UNLOCKED_DONE' THEN 'completed'
WHEN task_events.state = 'IMAGE_PROCESSED' THEN 'completed'
WHEN task_events.state = 'UNFLYABLE_TASK' THEN 'unflyable task'
ELSE 'UNLOCKED_TO_MAP'
END AS state
Expand Down