Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add acquisition use case #112

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions edge_orchestrator/config/data_gathering_with_2_fake_cam.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"station_name": "data_gathering_with_2_fake_cam",
"camera_configs": {
"camera_1": {
"camera_id": "camera_1",
"camera_type": "fake",
"source_directory": "fake_images/marker_images",
"position": "front"
},
"camera_2": {
"camera_id": "camera_2",
"camera_type": "fake",
"source_directory": "fake_images/marker_images",
"position": "front"
}
},
"binary_storage_config": {
"storage_type": "gcp"
},
"metadata_storage_config": {
"storage_type": "gcp"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import logging

from edge_orchestrator.domain.models.item import Item
from edge_orchestrator.domain.models.station_config import StationConfig
from edge_orchestrator.domain.ports.binary_storage.i_binary_storage_factory import (
IBinaryStorageFactory,
)
from edge_orchestrator.domain.ports.binary_storage.i_binary_storage_manager import (
IBinaryStorageManager,
)
from edge_orchestrator.domain.ports.camera.i_camera_manager import ICameraManager
from edge_orchestrator.domain.ports.metadata_storage.i_metadata_storage_factory import (
IMetadataStorageFactory,
)
from edge_orchestrator.domain.ports.metadata_storage.i_metadata_storage_manager import (
IMetadataStorageManager,
)
from edge_orchestrator.utils.singleton import SingletonMeta


class DataGathering(metaclass=SingletonMeta):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

je réfléchissais l'autre jour à cette histoire de singleton, je ne sais pas si c'est le mieux mais bon ça marche à condition que tu mettes une méthode de reset en cas de changement de config > regarde là

def __init__(
self,
metadata_storage_manager: IMetadataStorageManager,
binary_storage_manager: IBinaryStorageManager,
camera_manager: ICameraManager,
):
self._logger = logging.getLogger(__name__)
self._metadata_storage_manager = metadata_storage_manager
self._binary_storage_manager = binary_storage_manager
self._camera_manager = camera_manager

async def acquire(self, item: Item, station_config: StationConfig):
self._camera_manager.create_cameras(station_config)
self._camera_manager.take_pictures(item)

self._binary_storage_manager.get_binary_storage(station_config).save_item_binaries(item)

self._metadata_storage_manager.get_metadata_storage(station_config).save_item_metadata(item)

def reset_managers(
self, binary_storage_factory: IBinaryStorageFactory, metadata_storage_factory: IMetadataStorageFactory
):
self._logger.info("Resetting all managers after configuration update...")
self._metadata_storage_manager.reset(metadata_storage_factory)
self._binary_storage_manager.reset(binary_storage_factory)
self._camera_manager.reset()
self._logger.info("Managers reset done!")
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict
from typing import Dict, Optional

from pydantic import BaseModel, ConfigDict

Expand All @@ -14,4 +14,4 @@ class StationConfig(BaseModel):
camera_configs: Dict[str, CameraConfig]
binary_storage_config: StorageConfig
metadata_storage_config: StorageConfig
item_rule_config: ItemRuleConfig
item_rule_config: Optional[ItemRuleConfig] = None
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class StorageConfig(BaseModel):
storage_type: StorageType = StorageType.FILESYSTEM
bucket_name: Optional[str] = os.getenv("BUCKET_NAME", None)
target_directory: Path = Path(os.getenv("EDGE_NAME", "data_storage"))
class_directory: Optional[str] = None

@model_validator(mode="after")
def check_params(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
from typing import Optional
from uuid import UUID

from edge_orchestrator.domain.models.storage.storage_config import StorageConfig
Expand All @@ -12,5 +13,18 @@ def __init__(self, storage_config: StorageConfig, station_name: str):
def get_storing_prefix_path(self) -> Path:
return self._storage_config.target_directory / self.station_name

def get_storing_path(self, item_id: UUID) -> Path:
return self.get_storing_prefix_path() / str(item_id)
def get_storing_path(self) -> Path:
class_directory = self._storage_config.class_directory
prefix_path = self.get_storing_prefix_path()
return prefix_path / class_directory if class_directory else prefix_path

def get_file_path(self, item_id: UUID, extension: str, camera_id: Optional[str] = None) -> Path:
storing_path = self.get_storing_path()
return (
storing_path / f"{item_id}_{camera_id}.{extension}"
if camera_id
else storing_path / f"{item_id}.{extension}"
)

def set(self, storage_config: StorageConfig, station_name: str):
self._storage_config = storage_config
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ def __init__(self, station_config: StationConfig, storing_path_manager: StoringP

def save_item_binaries(self, item: Item):
self._logger.info(f"Saving item binaries for item {item.id}")
path = self._storing_path_manager.get_storing_path(item.id)
path = self._storing_path_manager.get_storing_path()
path.mkdir(parents=True, exist_ok=True)
for camera_id, image in item.binaries.items():
if image.image_bytes is None:
continue
filepath = path / f"{camera_id}.jpg"
filepath = self._storing_path_manager.get_file_path(item.id, "jpg", camera_id)
item.binaries[camera_id].storing_path = filepath
with filepath.open("wb") as f:
f.write(image.image_bytes)
Expand All @@ -36,7 +36,7 @@ def save_item_binaries(self, item: Item):
item.state = ItemState.SAVE_BINARIES

def get_item_binaries(self, item_id: UUID) -> Dict[str, bytes]:
path = self._storing_path_manager.get_storing_path(item_id)
path = self._storing_path_manager.get_storing_path()
item_binaries = {}
for extension in ImageExtension:
for binary_path in path.glob(f"*.{extension.value}"):
Expand All @@ -45,15 +45,15 @@ def get_item_binaries(self, item_id: UUID) -> Dict[str, bytes]:
return item_binaries

def get_item_binary_names(self, item_id: UUID) -> List[str]:
path = self._storing_path_manager.get_storing_path(item_id)
path = self._storing_path_manager.get_storing_path()
item_binaries = []
for extension in ImageExtension:
for binary_path in path.glob(f"*.{extension.value}"):
item_binaries.append(binary_path.name)
return item_binaries

def get_item_binary(self, item_id: UUID, camera_id: str) -> bytes:
filepath = self._storing_path_manager.get_storing_path(item_id) / f"{camera_id}.jpg"
filepath = self._storing_path_manager.get_file_path(item_id, "jpg", camera_id)
# TODO: test with non existing item binary
if not filepath.exists():
raise HTTPException(status_code=400, detail=f"The item {item_id} has no binary for {camera_id}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ def __init__(self, station_config: StationConfig, storing_path_manager: StoringP
def save_item_binaries(self, item: Item):
self._logger.info(f"Saving item binaries for item {item.id}")
for camera_id, image in item.binaries.items():
blob = self._bucket.blob(
(self._storing_path_manager.get_storing_path(item.id) / f"{camera_id}.jpg").as_posix()
)
blob = self._bucket.blob(self._storing_path_manager.get_file_path(item.id, "jpg", camera_id).as_posix())
if blob is None:
raise Exception("An image should be upload")
blob.upload_from_string(image.image_bytes, content_type="image/jpg")
Expand All @@ -38,22 +36,22 @@ def save_item_binaries(self, item: Item):

def get_item_binary_names(self, item_id: UUID) -> List[str]:
binaries = []
for blob in self._bucket.list_blobs(self._storing_path_manager.get_storing_path(item_id)):
for blob in self._bucket.list_blobs(self._storing_path_manager.get_storing_path()):
if item_id in blob.name:
binaries.append(blob.name)
return binaries

def get_item_binaries(self, item_id: UUID) -> Dict[str, bytes]:
binaries = {}
for blob in self._bucket.list_blobs(prefix=self._storing_path_manager.get_storing_path(item_id).as_posix()):
for blob in self._bucket.list_blobs(prefix=self._storing_path_manager.get_storing_path().as_posix()):
if item_id in blob.name:
binary = blob.download_as_bytes()
camera_id = Path(blob.name).stem
binaries[camera_id] = binary
return binaries

def get_item_binary(self, item_id: UUID, camera_id: str) -> bytes:
filename = (self._storing_path_manager.get_storing_path(item_id) / f"{camera_id}.jpg").as_posix()
filename = (self._storing_path_manager.get_file_path(item_id, "jpg", camera_id)).as_posix()
blob = self._bucket.get_blob(filename)
if blob is None:
raise HTTPException(status_code=400, detail=f"The item {item_id} has no binary for {camera_id}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ def __init__(self, station_config: StationConfig, storing_path_manager: StoringP
def save_item_metadata(self, item: Item):
self._logger.info(f"Saving item metadata for item {item.id}")
item.state = ItemState.DONE
filepath = self._storing_path_manager.get_storing_path(item.id) / "metadata.json"
filepath = self._storing_path_manager.get_file_path(item.id, "json")

filepath.parent.mkdir(parents=True, exist_ok=True)
with filepath.open("w") as f:
f.write(item.model_dump_json(exclude_none=True))
self._logger.info(f"Item metadata for item {item.id} saved as {filepath.as_posix()}")

def get_item_metadata(self, item_id: UUID) -> Item:
filepath = self._storing_path_manager.get_storing_path(item_id) / "metadata.json"
filepath = self._storing_path_manager.get_file_path(item_id, "json")
# TODO: test with non existing item metadata
if not filepath.exists():
raise HTTPException(status_code=400, detail=f"The item {item_id} has no metadata")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ def save_item_metadata(self, item: Item):
self._logger.info(f"Saving item metadata for item {item.id}")
item.state = ItemState.DONE
item_metadata = item.model_dump_json(exclude_none=True)
blob = self._bucket.blob((self._storing_path_manager.get_storing_path(item.id) / "metadata.json").as_posix())
blob = self._bucket.blob(self._storing_path_manager.get_file_path(item.id, "json").as_posix())
blob.upload_from_string(item_metadata, content_type="application/json")
self._logger.info(f"Item metadata for item {item.id} saved as {blob.name}")

def get_item_metadata(self, item_id: UUID) -> Item:
filename = (self._storing_path_manager.get_storing_path(item_id) / "metadata.json").as_posix()
filename = self._storing_path_manager.get_file_path(item_id, "json").as_posix()
blob = self._bucket.get_blob(filename)
if blob is None:
raise HTTPException(status_code=400, detail=f"The item {item_id} has no metadata")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from fastapi import Depends, HTTPException

from edge_orchestrator.application.config.config_manager import ConfigManager
from edge_orchestrator.application.use_cases.data_gathering import DataGathering
from edge_orchestrator.application.use_cases.supervisor import Supervisor
from edge_orchestrator.domain.models.station_config import StationConfig
from edge_orchestrator.domain.ports.binary_storage.i_binary_storage_factory import (
Expand Down Expand Up @@ -157,3 +158,21 @@ def get_supervisor(
)
manager.config_updated = False
return supervisor


def get_data_gathering(
metadata_storage_manager: IMetadataStorageManager = Depends(get_metadata_storage_manager),
binary_storage_manager: IBinaryStorageManager = Depends(get_binary_storage_manager),
camera_manager: ICameraManager = Depends(get_camera_manager),
binary_storage_factory: IBinaryStorageFactory = Depends(get_binary_storage_factory),
metadata_storage_factory: IMetadataStorageFactory = Depends(get_metadata_storage_factory),
) -> DataGathering:
data_gathering = DataGathering(metadata_storage_manager, binary_storage_manager, camera_manager)
manager = ConfigManager()
if manager.config_updated:
data_gathering.reset_managers(
binary_storage_factory,
metadata_storage_factory,
)
manager.config_updated = False
return data_gathering
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)

from edge_orchestrator.application.config.config_manager import ConfigManager
from edge_orchestrator.application.use_cases.data_gathering import DataGathering
from edge_orchestrator.application.use_cases.supervisor import Supervisor
from edge_orchestrator.domain.models.camera.camera_config import CameraConfig
from edge_orchestrator.domain.models.image import Image
Expand All @@ -26,6 +27,7 @@
from edge_orchestrator.interface.api.dependency_injection import (
get_binary_storage_manager,
get_config,
get_data_gathering,
get_metadata_storage_manager,
get_supervisor,
)
Expand Down Expand Up @@ -118,7 +120,7 @@ async def trigger_job(
):
input_binaries = {}
for binary in binaries:
input_binaries[binary.filename] = Image(image_bytes=binary.read())
input_binaries[binary.filename] = Image(image_bytes=await binary.read())
item = Item(
cameras_metadata=cameras_metadata,
binaries=input_binaries,
Expand All @@ -127,6 +129,29 @@ async def trigger_job(
return {"item_id": item.id}


async def data_gathering_job(
class_name: str = None,
binaries: List[UploadFile] = [],
cameras_metadata: Dict[str, CameraConfig] = {},
data_gathering: DataGathering = Depends(get_data_gathering),
station_config: StationConfig = Depends(get_config),
background_tasks: BackgroundTasks = None,
):
# Set the class name on the config
station_config.binary_storage_config.class_directory = class_name
station_config.metadata_storage_config.class_directory = class_name

input_binaries = {}
for binary in binaries:
input_binaries[binary.filename] = Image(image_bytes=await binary.read())
item = Item(
cameras_metadata=cameras_metadata,
binaries=input_binaries,
)
background_tasks.add_task(data_gathering.acquire, item, station_config)
return {"item_id": item.id}


router = APIRouter(prefix="/api/v1")
router.add_api_route("/", home, methods=["GET"])
router.add_api_route("/health/live", get_health, methods=["GET"])
Expand All @@ -140,3 +165,4 @@ async def trigger_job(
router.add_api_route("/configs/active", set_config, methods=["POST"], response_model_exclude_none=True)

router.add_api_route("/trigger", trigger_job, methods=["POST"])
router.add_api_route("/data_gathering", data_gathering_job, methods=["POST"])
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ def check_item_binaries_are_stored(context: Context):

response_1_content = response_1.json()
for row in context.table:
image_name = f'{row["binary_name"]}.{row["binary_extension"]}'
image_name = f'{context.item_id}_{row["binary_name"]}.{row["binary_extension"]}'
assert image_name in response_1_content

path_to_image = context.test_directory / f"data_storage/{context.item_id}" / image_name
path_to_image = context.test_directory / "data_storage" / image_name
assert path_to_image.exists()
with path_to_image.open("rb") as f:
image_binary = f.read()
Expand Down
Loading
Loading