From f800e37488173fdef7e632e9c37545dc2a63d843 Mon Sep 17 00:00:00 2001 From: "gireg.roussel" Date: Tue, 4 Mar 2025 10:24:58 +0100 Subject: [PATCH 1/8] make item_rule_config optional --- .../src/edge_orchestrator/domain/models/station_config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/edge_orchestrator/src/edge_orchestrator/domain/models/station_config.py b/edge_orchestrator/src/edge_orchestrator/domain/models/station_config.py index 03d8cdd5..fddc2de3 100644 --- a/edge_orchestrator/src/edge_orchestrator/domain/models/station_config.py +++ b/edge_orchestrator/src/edge_orchestrator/domain/models/station_config.py @@ -1,4 +1,4 @@ -from typing import Dict +from typing import Dict, Optional from pydantic import BaseModel, ConfigDict @@ -14,4 +14,4 @@ class StationConfig(BaseModel): camera_configs: Dict[str, CameraConfig] binary_storage_config: StorageConfig metadata_storage_config: StorageConfig - item_rule_config: ItemRuleConfig + item_rule_config: Optional[ItemRuleConfig] = None From 1541e9016b537f84769bb6b6ce081051fcc0802e Mon Sep 17 00:00:00 2001 From: "gireg.roussel" Date: Tue, 4 Mar 2025 10:25:50 +0100 Subject: [PATCH 2/8] add data gathering use case --- .../data_gathering_with_2_fake_cam.json | 27 ++++++++++++ .../application/use_cases/data_gathering.py | 41 +++++++++++++++++++ .../interface/api/dependency_injection.py | 11 +++++ .../interface/api/routers/v1/router.py | 27 ++++++++++++ 4 files changed, 106 insertions(+) create mode 100644 edge_orchestrator/config/data_gathering_with_2_fake_cam.json create mode 100644 edge_orchestrator/src/edge_orchestrator/application/use_cases/data_gathering.py diff --git a/edge_orchestrator/config/data_gathering_with_2_fake_cam.json b/edge_orchestrator/config/data_gathering_with_2_fake_cam.json new file mode 100644 index 00000000..bb231240 --- /dev/null +++ b/edge_orchestrator/config/data_gathering_with_2_fake_cam.json @@ -0,0 +1,27 @@ +{ + "station_name": "data_gathering_with_2_fake_cam", + "camera_configs": { + "camera_1": { + "camera_id": "camera_1", + "camera_type": "fake", + "source_directory": "fake_images/marker_images", + "position": "front", + "recreate_me": true + }, + "camera_2": { + "camera_id": "camera_2", + "camera_type": "fake", + "source_directory": "fake_images/marker_images", + "position": "front", + "recreate_me": true + } + }, + "binary_storage_config": { + "storage_type": "gcp", + "recreate_me": false + }, + "metadata_storage_config": { + "storage_type": "gcp", + "recreate_me": false + } +} \ No newline at end of file diff --git a/edge_orchestrator/src/edge_orchestrator/application/use_cases/data_gathering.py b/edge_orchestrator/src/edge_orchestrator/application/use_cases/data_gathering.py new file mode 100644 index 00000000..d4e77fee --- /dev/null +++ b/edge_orchestrator/src/edge_orchestrator/application/use_cases/data_gathering.py @@ -0,0 +1,41 @@ +import logging + +from edge_orchestrator.domain.models.item import Item +from edge_orchestrator.domain.models.station_config import StationConfig +from edge_orchestrator.domain.ports.binary_storage.i_binary_storage_manager import ( + IBinaryStorageManager, +) +from edge_orchestrator.domain.ports.camera.i_camera_manager import ICameraManager +from edge_orchestrator.domain.ports.camera_rule.i_camera_rule_manager import ( + ICameraRuleManager, +) +from edge_orchestrator.domain.ports.item_rule.i_item_rule_manager import ( + IItemRuleManager, +) +from edge_orchestrator.domain.ports.metadata_storage.i_metadata_storage_manager import ( + IMetadataStorageManager, +) +from edge_orchestrator.domain.ports.model_forwarder.i_model_forwarder_manager import ( + IModelForwarderManager, +) +from edge_orchestrator.utils.singleton import SingletonMeta + + +class DataGathering(metaclass=SingletonMeta): + def __init__( + self, + metadata_storage_manager: IMetadataStorageManager, + binary_storage_manager: IBinaryStorageManager, + camera_manager: ICameraManager, + ): + self._logger = logging.getLogger(__name__) + self._metadata_storage_manager = metadata_storage_manager + self._binary_storage_manager = binary_storage_manager + self._camera_manager = camera_manager + + async def acquire(self, item: Item, station_config: StationConfig): + self._camera_manager.take_pictures(item) + + self._binary_storage_manager.get_binary_storage(station_config).save_item_binaries(item) + + self._metadata_storage_manager.get_metadata_storage(station_config).save_item_metadata(item) diff --git a/edge_orchestrator/src/edge_orchestrator/interface/api/dependency_injection.py b/edge_orchestrator/src/edge_orchestrator/interface/api/dependency_injection.py index 729f1e92..5cf9d671 100644 --- a/edge_orchestrator/src/edge_orchestrator/interface/api/dependency_injection.py +++ b/edge_orchestrator/src/edge_orchestrator/interface/api/dependency_injection.py @@ -1,3 +1,4 @@ +from edge_orchestrator.application.use_cases.data_gathering import DataGathering from fastapi import Depends, HTTPException from edge_orchestrator.application.config.config_manager import ConfigManager @@ -157,3 +158,13 @@ def get_supervisor( ) manager.config_updated = False return supervisor + +def get_data_gathering( + metadata_storage_manager: IMetadataStorageManager = Depends(get_metadata_storage_manager), + binary_storage_manager: IBinaryStorageManager = Depends(get_binary_storage_manager), + camera_manager: ICameraManager = Depends(get_camera_manager), + station_config: StationConfig = Depends(get_config), +) -> DataGathering: + data_gathering = DataGathering(metadata_storage_manager, binary_storage_manager, camera_manager) + data_gathering._camera_manager.create_cameras(station_config) + return data_gathering \ No newline at end of file diff --git a/edge_orchestrator/src/edge_orchestrator/interface/api/routers/v1/router.py b/edge_orchestrator/src/edge_orchestrator/interface/api/routers/v1/router.py index 9c0e5730..de79d5a5 100644 --- a/edge_orchestrator/src/edge_orchestrator/interface/api/routers/v1/router.py +++ b/edge_orchestrator/src/edge_orchestrator/interface/api/routers/v1/router.py @@ -1,6 +1,7 @@ from typing import Dict, List, Optional from uuid import UUID +from edge_orchestrator.application.use_cases.data_gathering import DataGathering from fastapi import ( APIRouter, BackgroundTasks, @@ -28,6 +29,7 @@ get_config, get_metadata_storage_manager, get_supervisor, + get_data_gathering ) @@ -127,6 +129,30 @@ async def trigger_job( return {"item_id": item.id} +async def data_gathering_job( + class_name: str = None, + binaries: List[UploadFile] = [], + cameras_metadata: Dict[str, CameraConfig] = {}, + data_gathering: DataGathering = Depends(get_data_gathering), + station_config: StationConfig = Depends(get_config), + background_tasks: BackgroundTasks = None, +): + print("class_name", class_name) + # Set the class name on the config + station_config.binary_storage_config.class_directory = class_name + station_config.metadata_storage_config.class_directory = class_name + + input_binaries = {} + for binary in binaries: + input_binaries[binary.filename] = Image(image_bytes=binary.read()) + item = Item( + cameras_metadata=cameras_metadata, + binaries=input_binaries, + ) + background_tasks.add_task(data_gathering.acquire, item, station_config) + return {"item_id": item.id} + + router = APIRouter(prefix="/api/v1") router.add_api_route("/", home, methods=["GET"]) router.add_api_route("/health/live", get_health, methods=["GET"]) @@ -140,3 +166,4 @@ async def trigger_job( router.add_api_route("/configs/active", set_config, methods=["POST"], response_model_exclude_none=True) router.add_api_route("/trigger", trigger_job, methods=["POST"]) +router.add_api_route("/data_gathering", data_gathering_job, methods=["POST"]) From b5ab52958a6088522b1debda4f973b63a613fb64 Mon Sep 17 00:00:00 2001 From: "gireg.roussel" Date: Tue, 4 Mar 2025 11:41:01 +0100 Subject: [PATCH 3/8] change file organization --- .../domain/models/storage/storage_config.py | 1 + .../domain/ports/storing_path_manager.py | 15 ++++++++-- .../filesystem_binary_storage.py | 10 +++---- .../binary_storage/gcp_binary_storage.py | 8 +++--- .../filesystem_metadata_storage.py | 5 ++-- .../metadata_storage/gcp_metadata_storage.py | 4 +-- .../test_filesystem_binary_storage.py | 28 +++++++++---------- .../test_filesystem_metadata_storage.py | 10 +++---- 8 files changed, 47 insertions(+), 34 deletions(-) diff --git a/edge_orchestrator/src/edge_orchestrator/domain/models/storage/storage_config.py b/edge_orchestrator/src/edge_orchestrator/domain/models/storage/storage_config.py index 62b81b5e..d01cf41d 100644 --- a/edge_orchestrator/src/edge_orchestrator/domain/models/storage/storage_config.py +++ b/edge_orchestrator/src/edge_orchestrator/domain/models/storage/storage_config.py @@ -11,6 +11,7 @@ class StorageConfig(BaseModel): storage_type: StorageType = StorageType.FILESYSTEM bucket_name: Optional[str] = os.getenv("BUCKET_NAME", None) target_directory: Path = Path(os.getenv("EDGE_NAME", "data_storage")) + class_directory: Optional[str] = None @model_validator(mode="after") def check_params(self): diff --git a/edge_orchestrator/src/edge_orchestrator/domain/ports/storing_path_manager.py b/edge_orchestrator/src/edge_orchestrator/domain/ports/storing_path_manager.py index 96415cf6..7ed0585f 100644 --- a/edge_orchestrator/src/edge_orchestrator/domain/ports/storing_path_manager.py +++ b/edge_orchestrator/src/edge_orchestrator/domain/ports/storing_path_manager.py @@ -1,4 +1,5 @@ from pathlib import Path +from typing import Optional from uuid import UUID from edge_orchestrator.domain.models.storage.storage_config import StorageConfig @@ -12,5 +13,15 @@ def __init__(self, storage_config: StorageConfig, station_name: str): def get_storing_prefix_path(self) -> Path: return self._storage_config.target_directory / self.station_name - def get_storing_path(self, item_id: UUID) -> Path: - return self.get_storing_prefix_path() / str(item_id) + def get_storing_path(self) -> Path: + class_directory = self._storage_config.class_directory + prefix_path = self.get_storing_prefix_path() + return prefix_path / class_directory if class_directory else prefix_path + + def get_file_path(self, item_id: UUID, extension: str, camera_id: Optional[str]=None) -> Path: + storing_path = self.get_storing_path() + return storing_path / f"{item_id}_{camera_id}.{extension}" if camera_id else storing_path / f"{item_id}.{extension}" + + def set(self, storage_config: StorageConfig, station_name: str): + self._storage_config = storage_config + self.station_name = station_name diff --git a/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/binary_storage/filesystem_binary_storage.py b/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/binary_storage/filesystem_binary_storage.py index eeea9a86..fc1c42b7 100644 --- a/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/binary_storage/filesystem_binary_storage.py +++ b/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/binary_storage/filesystem_binary_storage.py @@ -22,12 +22,12 @@ def __init__(self, station_config: StationConfig, storing_path_manager: StoringP def save_item_binaries(self, item: Item): self._logger.info(f"Saving item binaries for item {item.id}") - path = self._storing_path_manager.get_storing_path(item.id) + path = self._storing_path_manager.get_storing_path() path.mkdir(parents=True, exist_ok=True) for camera_id, image in item.binaries.items(): if image.image_bytes is None: continue - filepath = path / f"{camera_id}.jpg" + filepath = self._storing_path_manager.get_file_path(item.id, "jpg", camera_id) item.binaries[camera_id].storing_path = filepath with filepath.open("wb") as f: f.write(image.image_bytes) @@ -36,7 +36,7 @@ def save_item_binaries(self, item: Item): item.state = ItemState.SAVE_BINARIES def get_item_binaries(self, item_id: UUID) -> Dict[str, bytes]: - path = self._storing_path_manager.get_storing_path(item_id) + path = self._storing_path_manager.get_storing_path() item_binaries = {} for extension in ImageExtension: for binary_path in path.glob(f"*.{extension.value}"): @@ -45,7 +45,7 @@ def get_item_binaries(self, item_id: UUID) -> Dict[str, bytes]: return item_binaries def get_item_binary_names(self, item_id: UUID) -> List[str]: - path = self._storing_path_manager.get_storing_path(item_id) + path = self._storing_path_manager.get_storing_path() item_binaries = [] for extension in ImageExtension: for binary_path in path.glob(f"*.{extension.value}"): @@ -53,7 +53,7 @@ def get_item_binary_names(self, item_id: UUID) -> List[str]: return item_binaries def get_item_binary(self, item_id: UUID, camera_id: str) -> bytes: - filepath = self._storing_path_manager.get_storing_path(item_id) / f"{camera_id}.jpg" + filepath = self._storing_path_manager.get_file_path(item_id, "jpg", camera_id) # TODO: test with non existing item binary if not filepath.exists(): raise HTTPException(status_code=400, detail=f"The item {item_id} has no binary for {camera_id}") diff --git a/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/binary_storage/gcp_binary_storage.py b/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/binary_storage/gcp_binary_storage.py index fbd253ee..1349ac2f 100644 --- a/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/binary_storage/gcp_binary_storage.py +++ b/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/binary_storage/gcp_binary_storage.py @@ -27,7 +27,7 @@ def save_item_binaries(self, item: Item): self._logger.info(f"Saving item binaries for item {item.id}") for camera_id, image in item.binaries.items(): blob = self._bucket.blob( - (self._storing_path_manager.get_storing_path(item.id) / f"{camera_id}.jpg").as_posix() + self._storing_path_manager.get_file_path(item.id, "jpg", camera_id).as_posix() ) if blob is None: raise Exception("An image should be upload") @@ -38,14 +38,14 @@ def save_item_binaries(self, item: Item): def get_item_binary_names(self, item_id: UUID) -> List[str]: binaries = [] - for blob in self._bucket.list_blobs(self._storing_path_manager.get_storing_path(item_id)): + for blob in self._bucket.list_blobs(self._storing_path_manager.get_storing_path()): if item_id in blob.name: binaries.append(blob.name) return binaries def get_item_binaries(self, item_id: UUID) -> Dict[str, bytes]: binaries = {} - for blob in self._bucket.list_blobs(prefix=self._storing_path_manager.get_storing_path(item_id).as_posix()): + for blob in self._bucket.list_blobs(prefix=self._storing_path_manager.get_storing_path().as_posix()): if item_id in blob.name: binary = blob.download_as_bytes() camera_id = Path(blob.name).stem @@ -53,7 +53,7 @@ def get_item_binaries(self, item_id: UUID) -> Dict[str, bytes]: return binaries def get_item_binary(self, item_id: UUID, camera_id: str) -> bytes: - filename = (self._storing_path_manager.get_storing_path(item_id) / f"{camera_id}.jpg").as_posix() + filename = (self._storing_path_manager.get_file_path(item_id, "jpg", camera_id)).as_posix() blob = self._bucket.get_blob(filename) if blob is None: raise HTTPException(status_code=400, detail=f"The item {item_id} has no binary for {camera_id}") diff --git a/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/metadata_storage/filesystem_metadata_storage.py b/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/metadata_storage/filesystem_metadata_storage.py index 421db7ad..95311086 100644 --- a/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/metadata_storage/filesystem_metadata_storage.py +++ b/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/metadata_storage/filesystem_metadata_storage.py @@ -26,14 +26,15 @@ def __init__(self, station_config: StationConfig, storing_path_manager: StoringP def save_item_metadata(self, item: Item): self._logger.info(f"Saving item metadata for item {item.id}") item.state = ItemState.DONE - filepath = self._storing_path_manager.get_storing_path(item.id) / "metadata.json" + filepath = self._storing_path_manager.get_file_path(item.id, "json") + filepath.parent.mkdir(parents=True, exist_ok=True) with filepath.open("w") as f: f.write(item.model_dump_json(exclude_none=True)) self._logger.info(f"Item metadata for item {item.id} saved as {filepath.as_posix()}") def get_item_metadata(self, item_id: UUID) -> Item: - filepath = self._storing_path_manager.get_storing_path(item_id) / "metadata.json" + filepath = self._storing_path_manager.get_file_path(item_id, "json") # TODO: test with non existing item metadata if not filepath.exists(): raise HTTPException(status_code=400, detail=f"The item {item_id} has no metadata") diff --git a/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/metadata_storage/gcp_metadata_storage.py b/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/metadata_storage/gcp_metadata_storage.py index 395e901f..03571a0b 100644 --- a/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/metadata_storage/gcp_metadata_storage.py +++ b/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/metadata_storage/gcp_metadata_storage.py @@ -27,12 +27,12 @@ def save_item_metadata(self, item: Item): self._logger.info(f"Saving item metadata for item {item.id}") item.state = ItemState.DONE item_metadata = item.model_dump_json(exclude_none=True) - blob = self._bucket.blob((self._storing_path_manager.get_storing_path(item.id) / "metadata.json").as_posix()) + blob = self._bucket.blob(self._storing_path_manager.get_file_path(item.id, "json").as_posix()) blob.upload_from_string(item_metadata, content_type="application/json") self._logger.info(f"Item metadata for item {item.id} saved as {blob.name}") def get_item_metadata(self, item_id: UUID) -> Item: - filename = (self._storing_path_manager.get_storing_path(item_id) / "metadata.json").as_posix() + filename = self._storing_path_manager.get_file_path(item_id, "json").as_posix() blob = self._bucket.get_blob(filename) if blob is None: raise HTTPException(status_code=400, detail=f"The item {item_id} has no metadata") diff --git a/edge_orchestrator/tests/unit_tests/infrastructure/adapters/binary_storage/test_filesystem_binary_storage.py b/edge_orchestrator/tests/unit_tests/infrastructure/adapters/binary_storage/test_filesystem_binary_storage.py index 49845af7..ad6f7973 100644 --- a/edge_orchestrator/tests/unit_tests/infrastructure/adapters/binary_storage/test_filesystem_binary_storage.py +++ b/edge_orchestrator/tests/unit_tests/infrastructure/adapters/binary_storage/test_filesystem_binary_storage.py @@ -27,7 +27,7 @@ def test_save_item_binaries_should_do_nothing_without_binaries(self, tmp_path: P binary_storage.save_item_binaries(item) # Then - assert len(list((target_directory / station_config.station_name / str(item.id)).iterdir())) == 0 + assert len(list((target_directory / station_config.station_name).iterdir())) == 0 def test_save_item_binaries_should_write_images_on_filesystem(self, tmp_path: Path, station_config: StationConfig): # Given @@ -52,8 +52,8 @@ def test_save_item_binaries_should_write_images_on_filesystem(self, tmp_path: Pa # Then path_to_pictures = [ - target_directory / station_config.station_name / str(item.id) / "camera_#1.jpg", - target_directory / station_config.station_name / str(item.id) / "camera_#2.jpg", + target_directory / station_config.station_name / f"{str(item.id)}_camera_#1.jpg", + target_directory / station_config.station_name / f"{str(item.id)}_camera_#2.jpg", ] for path_to_picture in path_to_pictures: assert path_to_picture.is_file() @@ -81,7 +81,7 @@ def test_save_item_binaries_with_target_directory_should_write_images_on_filesys binary_storage.save_item_binaries(item) # Then - path_to_picture = target_directory / station_config.station_name / str(item.id) / "camera_#1.jpg" + path_to_picture = target_directory / station_config.station_name / f"{str(item.id)}_camera_#1.jpg" assert path_to_picture.is_file() actual_picture = path_to_picture.open("rb").read() assert actual_picture == expected_picture @@ -96,15 +96,15 @@ def test_get_item_binaries_should_return_item_binaries(self, tmp_path: Path, sta expected_picture = bytes([0, 1, 2, 3, 4]) item_id = UUID("00000000-0000-0000-0000-000000000001") - item_storage_folder = target_directory / station_config.station_name / str(item_id) + item_storage_folder = target_directory / station_config.station_name item_storage_folder.mkdir(parents=True) with ( - (item_storage_folder / "camera_#1.jpg").open("wb") as f1, - (item_storage_folder / "camera_#2.jpg").open("wb") as f2, + (item_storage_folder /f"{str(item_id)}_camera_#1.jpg").open("wb") as f1, + (item_storage_folder /f"{str(item_id)}_camera_#2.jpg").open("wb") as f2, ): f1.write(expected_picture) f2.write(expected_picture) - expected_camera_ids = ["camera_#1", "camera_#2"] + expected_camera_ids = [f"{str(item_id)}_camera_#1", f"{str(item_id)}_camera_#2"] # When actual_binaries = binary_storage.get_item_binaries(item_id) @@ -125,15 +125,15 @@ def test_get_item_binary_names_should_return_binary_names(self, tmp_path: Path, expected_picture = bytes([0, 1, 2, 3, 4]) item_id = UUID("00000000-0000-0000-0000-000000000001") - item_storage_folder = target_directory / station_config.station_name / str(item_id) + item_storage_folder = target_directory / station_config.station_name item_storage_folder.mkdir(parents=True) with ( - (item_storage_folder / "camera_#1.jpg").open("wb") as f1, - (item_storage_folder / "camera_#2.jpg").open("wb") as f2, + (item_storage_folder /f"{str(item_id)}_camera_#1.jpg").open("wb") as f1, + (item_storage_folder /f"{str(item_id)}_camera_#2.jpg").open("wb") as f2, ): f1.write(expected_picture) f2.write(expected_picture) - expected_binary_names = ["camera_#1.jpg", "camera_#2.jpg"] + expected_binary_names = [f"{str(item_id)}_camera_#1.jpg", f"{str(item_id)}_camera_#2.jpg"] # When actual_binary_names = binary_storage.get_item_binary_names(item_id) @@ -153,9 +153,9 @@ def test_get_item_binary_should_return_requested_binary(self, tmp_path: Path, st expected_picture = bytes([0, 1, 2, 3, 4]) item_id = UUID("00000000-0000-0000-0000-000000000001") - item_storage_folder = target_directory / station_config.station_name / str(item_id) + item_storage_folder = target_directory / station_config.station_name item_storage_folder.mkdir(parents=True) - with (item_storage_folder / "camera_#1.jpg").open("wb") as f: + with (item_storage_folder /f"{str(item_id)}_camera_#1.jpg").open("wb") as f: f.write(expected_picture) # When diff --git a/edge_orchestrator/tests/unit_tests/infrastructure/adapters/metadata_storage/test_filesystem_metadata_storage.py b/edge_orchestrator/tests/unit_tests/infrastructure/adapters/metadata_storage/test_filesystem_metadata_storage.py index 7d65664e..dc48b351 100644 --- a/edge_orchestrator/tests/unit_tests/infrastructure/adapters/metadata_storage/test_filesystem_metadata_storage.py +++ b/edge_orchestrator/tests/unit_tests/infrastructure/adapters/metadata_storage/test_filesystem_metadata_storage.py @@ -48,7 +48,7 @@ def test_save_item_metadata_with_empty_item_should_write_metadata_on_filesystem( metadata_storage.save_item_metadata(item) # Then - path_to_metadata = target_directory / station_config.station_name / str(item.id) / "metadata.json" + path_to_metadata = target_directory / station_config.station_name / f"{str(item.id)}.json" assert path_to_metadata.is_file() actual_metadata = json.load(path_to_metadata.open("r")) assert actual_metadata @@ -130,7 +130,7 @@ def test_save_item_metadata_should_write_metadata_on_filesystem( metadata_storage.save_item_metadata(item) # Then - path_to_metadata = target_directory / station_config.station_name / str(item.id) / "metadata.json" + path_to_metadata = target_directory / station_config.station_name / f"{str(item.id)}.json" assert path_to_metadata.is_file() actual_metadata = json.load(path_to_metadata.open("r")) assert actual_metadata == expected_metadata @@ -162,7 +162,7 @@ def test_save_item_metadata_with_target_directory_should_write_metadata_on_files metadata_storage.save_item_metadata(item) # Then - path_to_metadata = target_directory / station_config.station_name / str(item.id) / "metadata.json" + path_to_metadata = target_directory / station_config.station_name / f"{str(item.id)}.json" assert path_to_metadata.is_file() actual_metadata = json.load(path_to_metadata.open("r")) assert actual_metadata @@ -208,9 +208,9 @@ def test_get_item_metadata_should_return_requested_item_metadata( "state": "DONE", } - storing_path = target_directory / station_config.station_name / str(item_id) + storing_path = target_directory / station_config.station_name storing_path.mkdir(parents=True) - with (storing_path / "metadata.json").open("w") as f: + with (storing_path / f"{str(item_id)}.json").open("w") as f: json.dump(expected_metadata, f) # When From 590b2c545a743d24f1b310fa1d6b27e53af9084b Mon Sep 17 00:00:00 2001 From: "gireg.roussel" Date: Tue, 4 Mar 2025 11:41:51 +0100 Subject: [PATCH 4/8] lint --- .../application/use_cases/data_gathering.py | 9 --------- .../domain/ports/storing_path_manager.py | 10 +++++++--- .../adapters/binary_storage/gcp_binary_storage.py | 4 +--- .../interface/api/dependency_injection.py | 5 +++-- .../interface/api/routers/v1/router.py | 4 ++-- 5 files changed, 13 insertions(+), 19 deletions(-) diff --git a/edge_orchestrator/src/edge_orchestrator/application/use_cases/data_gathering.py b/edge_orchestrator/src/edge_orchestrator/application/use_cases/data_gathering.py index d4e77fee..5ad313e5 100644 --- a/edge_orchestrator/src/edge_orchestrator/application/use_cases/data_gathering.py +++ b/edge_orchestrator/src/edge_orchestrator/application/use_cases/data_gathering.py @@ -6,18 +6,9 @@ IBinaryStorageManager, ) from edge_orchestrator.domain.ports.camera.i_camera_manager import ICameraManager -from edge_orchestrator.domain.ports.camera_rule.i_camera_rule_manager import ( - ICameraRuleManager, -) -from edge_orchestrator.domain.ports.item_rule.i_item_rule_manager import ( - IItemRuleManager, -) from edge_orchestrator.domain.ports.metadata_storage.i_metadata_storage_manager import ( IMetadataStorageManager, ) -from edge_orchestrator.domain.ports.model_forwarder.i_model_forwarder_manager import ( - IModelForwarderManager, -) from edge_orchestrator.utils.singleton import SingletonMeta diff --git a/edge_orchestrator/src/edge_orchestrator/domain/ports/storing_path_manager.py b/edge_orchestrator/src/edge_orchestrator/domain/ports/storing_path_manager.py index 7ed0585f..4bb02a94 100644 --- a/edge_orchestrator/src/edge_orchestrator/domain/ports/storing_path_manager.py +++ b/edge_orchestrator/src/edge_orchestrator/domain/ports/storing_path_manager.py @@ -17,10 +17,14 @@ def get_storing_path(self) -> Path: class_directory = self._storage_config.class_directory prefix_path = self.get_storing_prefix_path() return prefix_path / class_directory if class_directory else prefix_path - - def get_file_path(self, item_id: UUID, extension: str, camera_id: Optional[str]=None) -> Path: + + def get_file_path(self, item_id: UUID, extension: str, camera_id: Optional[str] = None) -> Path: storing_path = self.get_storing_path() - return storing_path / f"{item_id}_{camera_id}.{extension}" if camera_id else storing_path / f"{item_id}.{extension}" + return ( + storing_path / f"{item_id}_{camera_id}.{extension}" + if camera_id + else storing_path / f"{item_id}.{extension}" + ) def set(self, storage_config: StorageConfig, station_name: str): self._storage_config = storage_config diff --git a/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/binary_storage/gcp_binary_storage.py b/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/binary_storage/gcp_binary_storage.py index 1349ac2f..53f7d7ab 100644 --- a/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/binary_storage/gcp_binary_storage.py +++ b/edge_orchestrator/src/edge_orchestrator/infrastructure/adapters/binary_storage/gcp_binary_storage.py @@ -26,9 +26,7 @@ def __init__(self, station_config: StationConfig, storing_path_manager: StoringP def save_item_binaries(self, item: Item): self._logger.info(f"Saving item binaries for item {item.id}") for camera_id, image in item.binaries.items(): - blob = self._bucket.blob( - self._storing_path_manager.get_file_path(item.id, "jpg", camera_id).as_posix() - ) + blob = self._bucket.blob(self._storing_path_manager.get_file_path(item.id, "jpg", camera_id).as_posix()) if blob is None: raise Exception("An image should be upload") blob.upload_from_string(image.image_bytes, content_type="image/jpg") diff --git a/edge_orchestrator/src/edge_orchestrator/interface/api/dependency_injection.py b/edge_orchestrator/src/edge_orchestrator/interface/api/dependency_injection.py index 5cf9d671..359e07ef 100644 --- a/edge_orchestrator/src/edge_orchestrator/interface/api/dependency_injection.py +++ b/edge_orchestrator/src/edge_orchestrator/interface/api/dependency_injection.py @@ -1,7 +1,7 @@ -from edge_orchestrator.application.use_cases.data_gathering import DataGathering from fastapi import Depends, HTTPException from edge_orchestrator.application.config.config_manager import ConfigManager +from edge_orchestrator.application.use_cases.data_gathering import DataGathering from edge_orchestrator.application.use_cases.supervisor import Supervisor from edge_orchestrator.domain.models.station_config import StationConfig from edge_orchestrator.domain.ports.binary_storage.i_binary_storage_factory import ( @@ -159,6 +159,7 @@ def get_supervisor( manager.config_updated = False return supervisor + def get_data_gathering( metadata_storage_manager: IMetadataStorageManager = Depends(get_metadata_storage_manager), binary_storage_manager: IBinaryStorageManager = Depends(get_binary_storage_manager), @@ -167,4 +168,4 @@ def get_data_gathering( ) -> DataGathering: data_gathering = DataGathering(metadata_storage_manager, binary_storage_manager, camera_manager) data_gathering._camera_manager.create_cameras(station_config) - return data_gathering \ No newline at end of file + return data_gathering diff --git a/edge_orchestrator/src/edge_orchestrator/interface/api/routers/v1/router.py b/edge_orchestrator/src/edge_orchestrator/interface/api/routers/v1/router.py index de79d5a5..dc0a4508 100644 --- a/edge_orchestrator/src/edge_orchestrator/interface/api/routers/v1/router.py +++ b/edge_orchestrator/src/edge_orchestrator/interface/api/routers/v1/router.py @@ -1,7 +1,6 @@ from typing import Dict, List, Optional from uuid import UUID -from edge_orchestrator.application.use_cases.data_gathering import DataGathering from fastapi import ( APIRouter, BackgroundTasks, @@ -12,6 +11,7 @@ ) from edge_orchestrator.application.config.config_manager import ConfigManager +from edge_orchestrator.application.use_cases.data_gathering import DataGathering from edge_orchestrator.application.use_cases.supervisor import Supervisor from edge_orchestrator.domain.models.camera.camera_config import CameraConfig from edge_orchestrator.domain.models.image import Image @@ -27,9 +27,9 @@ from edge_orchestrator.interface.api.dependency_injection import ( get_binary_storage_manager, get_config, + get_data_gathering, get_metadata_storage_manager, get_supervisor, - get_data_gathering ) From 5ff0755d2f7c35c5f8629f7209694a2835c13f2b Mon Sep 17 00:00:00 2001 From: "gireg.roussel" Date: Tue, 4 Mar 2025 15:41:20 +0100 Subject: [PATCH 5/8] fix tests --- .../binary_storage/test_filesystem_binary_storage.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/edge_orchestrator/tests/unit_tests/infrastructure/adapters/binary_storage/test_filesystem_binary_storage.py b/edge_orchestrator/tests/unit_tests/infrastructure/adapters/binary_storage/test_filesystem_binary_storage.py index ad6f7973..a5278a3f 100644 --- a/edge_orchestrator/tests/unit_tests/infrastructure/adapters/binary_storage/test_filesystem_binary_storage.py +++ b/edge_orchestrator/tests/unit_tests/infrastructure/adapters/binary_storage/test_filesystem_binary_storage.py @@ -99,8 +99,8 @@ def test_get_item_binaries_should_return_item_binaries(self, tmp_path: Path, sta item_storage_folder = target_directory / station_config.station_name item_storage_folder.mkdir(parents=True) with ( - (item_storage_folder /f"{str(item_id)}_camera_#1.jpg").open("wb") as f1, - (item_storage_folder /f"{str(item_id)}_camera_#2.jpg").open("wb") as f2, + (item_storage_folder / f"{str(item_id)}_camera_#1.jpg").open("wb") as f1, + (item_storage_folder / f"{str(item_id)}_camera_#2.jpg").open("wb") as f2, ): f1.write(expected_picture) f2.write(expected_picture) @@ -128,8 +128,8 @@ def test_get_item_binary_names_should_return_binary_names(self, tmp_path: Path, item_storage_folder = target_directory / station_config.station_name item_storage_folder.mkdir(parents=True) with ( - (item_storage_folder /f"{str(item_id)}_camera_#1.jpg").open("wb") as f1, - (item_storage_folder /f"{str(item_id)}_camera_#2.jpg").open("wb") as f2, + (item_storage_folder / f"{str(item_id)}_camera_#1.jpg").open("wb") as f1, + (item_storage_folder / f"{str(item_id)}_camera_#2.jpg").open("wb") as f2, ): f1.write(expected_picture) f2.write(expected_picture) @@ -155,7 +155,7 @@ def test_get_item_binary_should_return_requested_binary(self, tmp_path: Path, st item_id = UUID("00000000-0000-0000-0000-000000000001") item_storage_folder = target_directory / station_config.station_name item_storage_folder.mkdir(parents=True) - with (item_storage_folder /f"{str(item_id)}_camera_#1.jpg").open("wb") as f: + with (item_storage_folder / f"{str(item_id)}_camera_#1.jpg").open("wb") as f: f.write(expected_picture) # When From 49e4d358e4578de66e68bf4f46d15bb470c257b4 Mon Sep 17 00:00:00 2001 From: "gireg.roussel" Date: Tue, 4 Mar 2025 15:57:02 +0100 Subject: [PATCH 6/8] remove recreate_me on config --- .../config/data_gathering_with_2_fake_cam.json | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/edge_orchestrator/config/data_gathering_with_2_fake_cam.json b/edge_orchestrator/config/data_gathering_with_2_fake_cam.json index bb231240..642e6f93 100644 --- a/edge_orchestrator/config/data_gathering_with_2_fake_cam.json +++ b/edge_orchestrator/config/data_gathering_with_2_fake_cam.json @@ -5,23 +5,19 @@ "camera_id": "camera_1", "camera_type": "fake", "source_directory": "fake_images/marker_images", - "position": "front", - "recreate_me": true + "position": "front" }, "camera_2": { "camera_id": "camera_2", "camera_type": "fake", "source_directory": "fake_images/marker_images", - "position": "front", - "recreate_me": true + "position": "front" } }, "binary_storage_config": { - "storage_type": "gcp", - "recreate_me": false + "storage_type": "gcp" }, "metadata_storage_config": { - "storage_type": "gcp", - "recreate_me": false + "storage_type": "gcp" } } \ No newline at end of file From a503c25c24b61362de4d9f4d48541560cf898566 Mon Sep 17 00:00:00 2001 From: "gireg.roussel" Date: Tue, 4 Mar 2025 17:38:38 +0100 Subject: [PATCH 7/8] review --- .../application/use_cases/data_gathering.py | 16 ++++++++++++++++ .../domain/ports/storing_path_manager.py | 1 - .../interface/api/dependency_injection.py | 11 +++++++++-- .../interface/api/routers/v1/router.py | 5 ++--- 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/edge_orchestrator/src/edge_orchestrator/application/use_cases/data_gathering.py b/edge_orchestrator/src/edge_orchestrator/application/use_cases/data_gathering.py index 5ad313e5..aa213e73 100644 --- a/edge_orchestrator/src/edge_orchestrator/application/use_cases/data_gathering.py +++ b/edge_orchestrator/src/edge_orchestrator/application/use_cases/data_gathering.py @@ -2,10 +2,16 @@ from edge_orchestrator.domain.models.item import Item from edge_orchestrator.domain.models.station_config import StationConfig +from edge_orchestrator.domain.ports.binary_storage.i_binary_storage_factory import ( + IBinaryStorageFactory, +) from edge_orchestrator.domain.ports.binary_storage.i_binary_storage_manager import ( IBinaryStorageManager, ) from edge_orchestrator.domain.ports.camera.i_camera_manager import ICameraManager +from edge_orchestrator.domain.ports.metadata_storage.i_metadata_storage_factory import ( + IMetadataStorageFactory, +) from edge_orchestrator.domain.ports.metadata_storage.i_metadata_storage_manager import ( IMetadataStorageManager, ) @@ -25,8 +31,18 @@ def __init__( self._camera_manager = camera_manager async def acquire(self, item: Item, station_config: StationConfig): + self._camera_manager.create_cameras(station_config) self._camera_manager.take_pictures(item) self._binary_storage_manager.get_binary_storage(station_config).save_item_binaries(item) self._metadata_storage_manager.get_metadata_storage(station_config).save_item_metadata(item) + + def reset_managers( + self, binary_storage_factory: IBinaryStorageFactory, metadata_storage_factory: IMetadataStorageFactory + ): + self._logger.info("Resetting all managers after configuration update...") + self._metadata_storage_manager.reset(metadata_storage_factory) + self._binary_storage_manager.reset(binary_storage_factory) + self._camera_manager.reset() + self._logger.info("Managers reset done!") diff --git a/edge_orchestrator/src/edge_orchestrator/domain/ports/storing_path_manager.py b/edge_orchestrator/src/edge_orchestrator/domain/ports/storing_path_manager.py index 4bb02a94..d6f3d149 100644 --- a/edge_orchestrator/src/edge_orchestrator/domain/ports/storing_path_manager.py +++ b/edge_orchestrator/src/edge_orchestrator/domain/ports/storing_path_manager.py @@ -28,4 +28,3 @@ def get_file_path(self, item_id: UUID, extension: str, camera_id: Optional[str] def set(self, storage_config: StorageConfig, station_name: str): self._storage_config = storage_config - self.station_name = station_name diff --git a/edge_orchestrator/src/edge_orchestrator/interface/api/dependency_injection.py b/edge_orchestrator/src/edge_orchestrator/interface/api/dependency_injection.py index 359e07ef..f0a72776 100644 --- a/edge_orchestrator/src/edge_orchestrator/interface/api/dependency_injection.py +++ b/edge_orchestrator/src/edge_orchestrator/interface/api/dependency_injection.py @@ -164,8 +164,15 @@ def get_data_gathering( metadata_storage_manager: IMetadataStorageManager = Depends(get_metadata_storage_manager), binary_storage_manager: IBinaryStorageManager = Depends(get_binary_storage_manager), camera_manager: ICameraManager = Depends(get_camera_manager), - station_config: StationConfig = Depends(get_config), + binary_storage_factory: IBinaryStorageFactory = Depends(get_binary_storage_factory), + metadata_storage_factory: IMetadataStorageFactory = Depends(get_metadata_storage_factory), ) -> DataGathering: data_gathering = DataGathering(metadata_storage_manager, binary_storage_manager, camera_manager) - data_gathering._camera_manager.create_cameras(station_config) + manager = ConfigManager() + if manager.config_updated: + data_gathering.reset_managers( + binary_storage_factory, + metadata_storage_factory, + ) + manager.config_updated = False return data_gathering diff --git a/edge_orchestrator/src/edge_orchestrator/interface/api/routers/v1/router.py b/edge_orchestrator/src/edge_orchestrator/interface/api/routers/v1/router.py index dc0a4508..7738969b 100644 --- a/edge_orchestrator/src/edge_orchestrator/interface/api/routers/v1/router.py +++ b/edge_orchestrator/src/edge_orchestrator/interface/api/routers/v1/router.py @@ -120,7 +120,7 @@ async def trigger_job( ): input_binaries = {} for binary in binaries: - input_binaries[binary.filename] = Image(image_bytes=binary.read()) + input_binaries[binary.filename] = Image(image_bytes=await binary.read()) item = Item( cameras_metadata=cameras_metadata, binaries=input_binaries, @@ -137,14 +137,13 @@ async def data_gathering_job( station_config: StationConfig = Depends(get_config), background_tasks: BackgroundTasks = None, ): - print("class_name", class_name) # Set the class name on the config station_config.binary_storage_config.class_directory = class_name station_config.metadata_storage_config.class_directory = class_name input_binaries = {} for binary in binaries: - input_binaries[binary.filename] = Image(image_bytes=binary.read()) + input_binaries[binary.filename] = Image(image_bytes=await binary.read()) item = Item( cameras_metadata=cameras_metadata, binaries=input_binaries, From e2e5161d4dba2fc26a066ecd2a64c8e910026d73 Mon Sep 17 00:00:00 2001 From: "gireg.roussel" Date: Wed, 5 Mar 2025 19:11:04 +0100 Subject: [PATCH 8/8] fix functional tests --- .../tests/functional_tests/steps/supervisor_trigger_route.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/edge_orchestrator/tests/functional_tests/steps/supervisor_trigger_route.py b/edge_orchestrator/tests/functional_tests/steps/supervisor_trigger_route.py index 3080ea56..ab5fabe1 100644 --- a/edge_orchestrator/tests/functional_tests/steps/supervisor_trigger_route.py +++ b/edge_orchestrator/tests/functional_tests/steps/supervisor_trigger_route.py @@ -67,10 +67,10 @@ def check_item_binaries_are_stored(context: Context): response_1_content = response_1.json() for row in context.table: - image_name = f'{row["binary_name"]}.{row["binary_extension"]}' + image_name = f'{context.item_id}_{row["binary_name"]}.{row["binary_extension"]}' assert image_name in response_1_content - path_to_image = context.test_directory / f"data_storage/{context.item_id}" / image_name + path_to_image = context.test_directory / "data_storage" / image_name assert path_to_image.exists() with path_to_image.open("rb") as f: image_binary = f.read()