From 8f706968160f213f9801ed215c8f375e70860aa2 Mon Sep 17 00:00:00 2001 From: Baptiste O'Jeanson Date: Fri, 21 Jul 2023 14:21:37 +0200 Subject: [PATCH 1/2] refactor: centralize edge_orchestrator conf as JSON only --- edge_orchestrator/config/.active_config.json | 65 +++++++ edge_orchestrator/config/inventory.json | 30 ++- ...ker_classification_with_1_fake_camera.json | 77 +++++--- ...marker_classification_with_1_picamera.json | 18 +- ...arker_classification_with_1_usbcamera.json | 18 +- ...er_classification_with_2_fake_cameras.json | 51 ++++- .../pin_detection_with_1_usbcamera.json | 14 +- .../pin_detection_with_2_usbcamera.json | 12 +- .../edge_orchestrator/api_config.py | 178 +++++++++++------- .../application/api_routes.py | 73 ++++--- .../edge_orchestrator/application/config.py | 79 ++++++++ .../application/dto/binary_storage_config.py | 16 ++ .../application/dto/camera_config.py | 55 ++++++ .../application/dto/edge_station_config.py | 14 ++ .../dto/metadata_storage_config.py | 17 ++ .../application/dto/model_config.py | 81 ++++++++ .../application/dto/model_forward_config.py | 14 ++ .../application/dto/station_config.py | 58 ++++++ .../application/dto/telemetry_sink_config.py | 15 ++ .../no_active_configuration_exception.py | 19 ++ .../edge_orchestrator/application/server.py | 14 +- .../edge_orchestrator/application/settings.py | 2 - .../application/trigger_routes.py | 61 +++--- .../edge_orchestrator/constants.py | 5 + .../domain/models/edge_station.py | 23 ++- .../domain/models/model_infos.py | 36 ++-- .../domain/ports/inventory.py | 8 - .../domain/ports/station_config.py | 30 --- .../domain/use_cases/supervisor.py | 34 ++-- .../domain/use_cases/uploader.py | 12 +- .../edge_orchestrator/environment/__init__.py | 0 .../edge_orchestrator/environment/config.py | 42 ----- .../edge_orchestrator/environment/default.py | 37 ---- .../edge_orchestrator/environment/docker.py | 47 ----- .../edge_with_azure_container_storage.py | 41 ---- .../edge_with_filesystem_metadata_storage.py | 45 ----- .../edge_with_mongo_db_metadata_storage.py | 44 ----- .../edge_orchestrator/environment/test.py | 50 ----- .../environment/upload_with_gcp_bucket.py | 41 ---- .../azure_container_binary_storage.py | 21 ++- .../binary_storage/binary_storage_factory.py | 43 +++++ .../filesystem_binary_storage.py | 10 +- ...torage.py => gcp_bucket_binary_storage.py} | 16 +- ...storage.py => in_memory_binary_storage.py} | 8 +- .../infrastructure/camera/fake_camera.py | 3 +- .../infrastructure/filesystem_helpers.py | 7 + .../infrastructure/inventory/__init__.py | 0 .../inventory/json_inventory.py | 30 --- .../azure_container_metadata_storage.py | 21 ++- .../filesystem_metadata_storage.py | 10 +- ...rage.py => gcp_bucket_metadata_storage.py} | 16 +- ...orage.py => in_memory_metadata_storage.py} | 2 +- .../metadata_storage_factory.py | 48 +++++ ...torage.py => mongo_db_metadata_storage.py} | 8 +- .../model_forward/model_forward_factory.py | 30 +++ .../model_forward/tf_serving_wrapper.py | 12 +- .../station_config/json_station_config.py | 43 +++-- ...try_sink.py => postgres_telemetry_sink.py} | 11 +- .../telemetry_sink/telemetry_sink_factory.py | 36 ++++ edge_orchestrator/pyproject.toml | 1 + edge_orchestrator/tests/conftest.py | 4 +- .../fixtures/supervisor_and_collaborators.py | 89 +++++++++ .../test_json_station_config.py | 8 +- .../test_postgresql_telemetry_sink.py | 2 +- .../domain/models/test_edge_station.py | 6 +- .../unit_tests/domain/test_supervisor.py | 52 +++-- .../binary_storage/test_gcp_binary_storage.py | 12 +- .../test_memory_binary_storage.py | 10 +- .../test_memory_item_storage.py | 10 +- 69 files changed, 1286 insertions(+), 759 deletions(-) create mode 100644 edge_orchestrator/config/.active_config.json create mode 100644 edge_orchestrator/edge_orchestrator/application/config.py create mode 100644 edge_orchestrator/edge_orchestrator/application/dto/binary_storage_config.py create mode 100644 edge_orchestrator/edge_orchestrator/application/dto/camera_config.py create mode 100644 edge_orchestrator/edge_orchestrator/application/dto/edge_station_config.py create mode 100644 edge_orchestrator/edge_orchestrator/application/dto/metadata_storage_config.py create mode 100644 edge_orchestrator/edge_orchestrator/application/dto/model_config.py create mode 100644 edge_orchestrator/edge_orchestrator/application/dto/model_forward_config.py create mode 100644 edge_orchestrator/edge_orchestrator/application/dto/station_config.py create mode 100644 edge_orchestrator/edge_orchestrator/application/dto/telemetry_sink_config.py create mode 100644 edge_orchestrator/edge_orchestrator/application/no_active_configuration_exception.py delete mode 100644 edge_orchestrator/edge_orchestrator/application/settings.py create mode 100644 edge_orchestrator/edge_orchestrator/constants.py delete mode 100644 edge_orchestrator/edge_orchestrator/domain/ports/inventory.py delete mode 100644 edge_orchestrator/edge_orchestrator/environment/__init__.py delete mode 100644 edge_orchestrator/edge_orchestrator/environment/config.py delete mode 100644 edge_orchestrator/edge_orchestrator/environment/default.py delete mode 100644 edge_orchestrator/edge_orchestrator/environment/docker.py delete mode 100644 edge_orchestrator/edge_orchestrator/environment/edge_with_azure_container_storage.py delete mode 100644 edge_orchestrator/edge_orchestrator/environment/edge_with_filesystem_metadata_storage.py delete mode 100644 edge_orchestrator/edge_orchestrator/environment/edge_with_mongo_db_metadata_storage.py delete mode 100644 edge_orchestrator/edge_orchestrator/environment/test.py delete mode 100644 edge_orchestrator/edge_orchestrator/environment/upload_with_gcp_bucket.py create mode 100644 edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/binary_storage_factory.py rename edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/{gcp_binary_storage.py => gcp_bucket_binary_storage.py} (72%) rename edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/{memory_binary_storage.py => in_memory_binary_storage.py} (77%) create mode 100644 edge_orchestrator/edge_orchestrator/infrastructure/filesystem_helpers.py delete mode 100644 edge_orchestrator/edge_orchestrator/infrastructure/inventory/__init__.py delete mode 100644 edge_orchestrator/edge_orchestrator/infrastructure/inventory/json_inventory.py rename edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/{gcp_metadata_storage.py => gcp_bucket_metadata_storage.py} (73%) rename edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/{memory_metadata_storage.py => in_memory_metadata_storage.py} (94%) create mode 100644 edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/metadata_storage_factory.py rename edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/{mongodb_metadata_storage.py => mongo_db_metadata_storage.py} (82%) create mode 100644 edge_orchestrator/edge_orchestrator/infrastructure/model_forward/model_forward_factory.py rename edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/{postgresql_telemetry_sink.py => postgres_telemetry_sink.py} (92%) create mode 100644 edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/telemetry_sink_factory.py create mode 100644 edge_orchestrator/tests/fixtures/supervisor_and_collaborators.py diff --git a/edge_orchestrator/config/.active_config.json b/edge_orchestrator/config/.active_config.json new file mode 100644 index 00000000..d91b003a --- /dev/null +++ b/edge_orchestrator/config/.active_config.json @@ -0,0 +1,65 @@ +{ + "infra": { + "binary_storage": { + "type": "filesystem", + "params": { + "src_directory": "/tmp/vio/edge_orchestrator/data/storage" + } + }, + "metadata_storage": { + "type": "filesystem" + }, + "mode1_forward": { + "type": "fake" + }, + "telemetry_sink": { + "type": "fake" + }, + "cameras": [ + { + "name": "camera_#1", + "type": "fake", + "source": "marker_images", + "position": "back", + "exposition": 100 + } + ] + }, + "domain": { + "camera_rules": [ + { + "name": "camera_#1", + "models_graph": [ + { + "name": "marker_quality_control", + "category": "classification", + "version": "1", + "class_names": [ + "OK", + "KO" + ], + "image_resolution": [ + 224, + 224 + ], + "depends_on": [] + } + ], + "rule": { + "name": "expected_label_rule", + "parameters": { + "expected_label": [ + "OK" + ] + } + } + } + ], + "item_rule": { + "name": "min_threshold_KO_rule", + "params": { + "threshold": 1 + } + } + } +} diff --git a/edge_orchestrator/config/inventory.json b/edge_orchestrator/config/inventory.json index d4b497f5..67891442 100644 --- a/edge_orchestrator/config/inventory.json +++ b/edge_orchestrator/config/inventory.json @@ -1,13 +1,35 @@ { + "binary_storages": [ + "azure_container", + "filesystem", + "gcp_bucket", + "in_memory" + ], "cameras": [ "fake", "pi_camera", "usb_camera" ], + "metadata_storages": [ + "azure_container", + "filesystem", + "gcp_bucket", + "in_memory", + "mongo_db" + ], + "model_forwards": [ + "fake", + "tf_serving" + ], + "telemetry_sinks": [ + "azure_iot_hub", + "fake", + "postgresql" + ], "models": { "marker_quality_control": { "category": "classification", - "version": 1, + "version": "1", "class_names": [ "OK", "KO" @@ -19,7 +41,7 @@ }, "pin_detection": { "category": "classification", - "version": 1, + "version": "1", "class_names": [ "OK", "NOK" @@ -31,7 +53,7 @@ }, "mobilenet_ssd_v2_coco": { "category": "object_detection", - "version": 1, + "version": "1", "class_names_path": "coco_labels_originals.txt", "output": { "boxes_coordinates": "detection_boxes", @@ -46,7 +68,7 @@ }, "mobilenet_ssd_v2_face": { "category": "object_detection", - "version": 1, + "version": "1", "class_names_path": "face_labels.txt", "output": { "boxes_coordinates": "detection_boxes", diff --git a/edge_orchestrator/config/station_configs/marker_classification_with_1_fake_camera.json b/edge_orchestrator/config/station_configs/marker_classification_with_1_fake_camera.json index 9b55e155..fa8d2b44 100644 --- a/edge_orchestrator/config/station_configs/marker_classification_with_1_fake_camera.json +++ b/edge_orchestrator/config/station_configs/marker_classification_with_1_fake_camera.json @@ -1,28 +1,63 @@ { - "cameras": { - "camera_id4": { - "type": "fake", - "source": "marker_images", - "position": "back", - "exposition": 100, - "models_graph": { - "model_id4": { - "name": "marker_quality_control", - "depends_on": [] - } - }, - "camera_rule": { - "name": "expected_label_rule", - "parameters": { - "expected_label": ["OK"] - } + "infra": { + "binary_storage": { + "type": "filesystem", + "params": { + "src_directory": "/tmp/vio/edge_orchestrator/data/storage" + } + }, + "metadata_storage": { + "type": "filesystem" + }, + "model_forward": { + "type": "fake" + }, + "telemetry_sink": { + "type": "fake" + }, + "cameras": { + "camera_#1": { + "type": "fake", + "source": "marker_images", + "position": "back", + "exposition": 100 } } }, - "item_rule": { - "name": "min_threshold_KO_rule", - "parameters": { - "threshold": 1 + "domain": { + "cameras": { + "camera_#1": { + "models_graph": { + "model_#4": { + "name": "marker_quality_control", + "category": "classification", + "version": "1", + "class_names": [ + "OK", + "KO" + ], + "image_resolution": [ + 224, + 224 + ], + "depends_on": [] + } + }, + "camera_rule": { + "name": "expected_label_rule", + "parameters": { + "expected_label": [ + "OK" + ] + } + } + } + }, + "item_rule": { + "name": "min_threshold_KO_rule", + "parameters": { + "threshold": 1 + } } } } diff --git a/edge_orchestrator/config/station_configs/marker_classification_with_1_picamera.json b/edge_orchestrator/config/station_configs/marker_classification_with_1_picamera.json index f3e0fd2e..a00878cf 100644 --- a/edge_orchestrator/config/station_configs/marker_classification_with_1_picamera.json +++ b/edge_orchestrator/config/station_configs/marker_classification_with_1_picamera.json @@ -1,19 +1,31 @@ { "cameras": { - "camera_id3": { + "camera_#3": { "type": "pi_camera", "position": "front", "exposition": 100, "models_graph": { - "model_id4": { + "model_#4": { "name": "marker_quality_control", + "category": "classification", + "version": "1", + "class_names": [ + "OK", + "KO" + ], + "image_resolution": [ + 224, + 224 + ], "depends_on": [] } }, "camera_rule": { "name": "expected_label_rule", "parameters": { - "expected_label": ["OK"] + "expected_label": [ + "OK" + ] } } } diff --git a/edge_orchestrator/config/station_configs/marker_classification_with_1_usbcamera.json b/edge_orchestrator/config/station_configs/marker_classification_with_1_usbcamera.json index 61b2baf8..6204f221 100644 --- a/edge_orchestrator/config/station_configs/marker_classification_with_1_usbcamera.json +++ b/edge_orchestrator/config/station_configs/marker_classification_with_1_usbcamera.json @@ -1,20 +1,32 @@ { "cameras": { - "camera_id3": { + "camera_#3": { "type": "usb_camera", "position": "front", "exposition": 100, "source": "/dev/video0", "models_graph": { - "model_id4": { + "model_#4": { "name": "marker_quality_control", + "category": "classification", + "version": "1", + "class_names": [ + "OK", + "KO" + ], + "image_resolution": [ + 224, + 224 + ], "depends_on": [] } }, "camera_rule": { "name": "expected_label_rule", "parameters": { - "expected_label": ["OK"] + "expected_label": [ + "OK" + ] } } } diff --git a/edge_orchestrator/config/station_configs/marker_classification_with_2_fake_cameras.json b/edge_orchestrator/config/station_configs/marker_classification_with_2_fake_cameras.json index ca6bd87c..2acaf7ca 100644 --- a/edge_orchestrator/config/station_configs/marker_classification_with_2_fake_cameras.json +++ b/edge_orchestrator/config/station_configs/marker_classification_with_2_fake_cameras.json @@ -1,38 +1,77 @@ { + "binary_storage": { + "type": "filesystem", + "params": { + "src_directory": "/tmp/vio/edge_orchestrator/data/storage" + } + }, + "metadata_storage": { + "type": "filesystem" + }, + "model_forward": { + "type": "fake" + }, + "telemetry_sink": { + "type": "fake" + }, "cameras": { - "camera_id3": { + "camera_#1": { "type": "fake", "source": "marker_images", "position": "back", "exposition": 100, "models_graph": { - "model_id4": { + "model_#4": { "name": "marker_quality_control", + "category": "classification", + "version": "1", + "class_names": [ + "OK", + "KO" + ], + "image_resolution": [ + 224, + 224 + ], "depends_on": [] } }, "camera_rule": { "name": "expected_label_rule", "parameters": { - "expected_label": ["OK"] + "expected_label": [ + "OK" + ] } } }, - "camera_id4": { + "camera_#4": { "type": "fake", "source": "marker_images", "position": "back", "exposition": 100, "models_graph": { - "model_id4": { + "model_#4": { "name": "marker_quality_control", + "category": "classification", + "version": "1", + "class_names": [ + "OK", + "KO" + ], + "image_resolution": [ + 224, + 224 + ], "depends_on": [] } }, "camera_rule": { "name": "expected_label_rule", "parameters": { - "expected_label": ["OK"] + "expected_label": [ + "OK" + ] } } } diff --git a/edge_orchestrator/config/station_configs/pin_detection_with_1_usbcamera.json b/edge_orchestrator/config/station_configs/pin_detection_with_1_usbcamera.json index fdec5701..3f52889c 100644 --- a/edge_orchestrator/config/station_configs/pin_detection_with_1_usbcamera.json +++ b/edge_orchestrator/config/station_configs/pin_detection_with_1_usbcamera.json @@ -1,13 +1,23 @@ { "cameras": { - "camera_id3": { + "camera_#3": { "type": "usb_camera", "source": "/dev/video0", "position": "front", "exposition": 100, "models_graph": { - "model_id4": { + "model_#4": { "name": "pin_detection", + "category": "classification", + "version": "1", + "class_names": [ + "OK", + "NOK" + ], + "image_resolution": [ + 224, + 224 + ], "depends_on": [] } }, diff --git a/edge_orchestrator/config/station_configs/pin_detection_with_2_usbcamera.json b/edge_orchestrator/config/station_configs/pin_detection_with_2_usbcamera.json index 9f8d6a09..1a6ef441 100644 --- a/edge_orchestrator/config/station_configs/pin_detection_with_2_usbcamera.json +++ b/edge_orchestrator/config/station_configs/pin_detection_with_2_usbcamera.json @@ -6,8 +6,18 @@ "position": "front", "exposition": 100, "models_graph": { - "model_id4": { + "model_#4": { "name": "pin_detection", + "category": "classification", + "version": "1", + "class_names": [ + "OK", + "NOK" + ], + "image_resolution": [ + 224, + 224 + ], "depends_on": [] } }, diff --git a/edge_orchestrator/edge_orchestrator/api_config.py b/edge_orchestrator/edge_orchestrator/api_config.py index 7aa787ef..197d035e 100644 --- a/edge_orchestrator/edge_orchestrator/api_config.py +++ b/edge_orchestrator/edge_orchestrator/api_config.py @@ -1,85 +1,119 @@ -import os - -from edge_orchestrator import logger - - -def load_config(): - configuration = os.environ.get("API_CONFIG", "default") - logger.info(f"App running with configuration: {configuration}") - - available_configurations = [ - "test", - "docker", - "default", - "edge", - "edge-lite", - "upload-gcp", - ] - if configuration not in available_configurations: - raise ValueError( - f"Unknown configuration '{configuration}'. " - f"Valid configurations are {available_configurations}." +from functools import lru_cache +from pathlib import Path + +from application.config import get_settings +from application.dto.station_config import StationConfig +from edge_orchestrator.domain.models.edge_station import EdgeStation +from edge_orchestrator.domain.ports.binary_storage import BinaryStorage +from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage +from edge_orchestrator.domain.ports.model_forward import ModelForward +from edge_orchestrator.domain.ports.telemetry_sink import TelemetrySink +from edge_orchestrator.domain.use_cases.supervisor import Supervisor +from edge_orchestrator.domain.use_cases.uploader import Uploader +from edge_orchestrator.infrastructure.binary_storage.binary_storage_factory import ( + BinaryStorageFactory, +) +from edge_orchestrator.infrastructure.metadata_storage.metadata_storage_factory import ( + MetadataStorageFactory, +) +from edge_orchestrator.infrastructure.model_forward.model_forward_factory import ( + ModelForwardFactory, +) +from edge_orchestrator.infrastructure.station_config.json_station_config import ( + JsonStationConfig, +) +from edge_orchestrator.infrastructure.telemetry_sink.telemetry_sink_factory import ( + TelemetrySinkFactory, +) + + +@lru_cache() +def get_supervisor() -> Supervisor: + return Supervisor( + get_binary_storage(), + get_edge_station(), + get_metadata_storage(), + get_model_forward(), + get_station_config(), + get_telemetry_sink(), + ) + + +@lru_cache() +def get_uploader() -> Uploader: + return Uploader( + get_metadata_storage(), + get_binary_storage(), + ) + + +def _get_active_config_name(active_config_path: Path) -> str: + if active_config_path.exists(): + with open(active_config_path, "r") as active_config: + return active_config.read().strip() + else: + return "no_active_configuration" + + +@lru_cache() +def get_station_config( + try_setting_station_config_from_file: bool = True, +) -> StationConfig: + settings = get_settings() + station_config = JsonStationConfig( + settings.station_configs_folder, settings.data_folder + ) + if try_setting_station_config_from_file: + station_config = set_station_config_from_file( + station_config, settings.active_config_path ) - elif configuration == "test": - from edge_orchestrator.environment.test import Test + return station_config - configuration_class = Test - elif configuration == "docker": - from edge_orchestrator.environment.docker import Docker - configuration_class = Docker - elif configuration == "default": - from edge_orchestrator.environment.default import Default - - configuration_class = Default - elif configuration == "edge": - from edge_orchestrator.environment.edge_with_mongo_db_metadata_storage import ( - EdgeWithMongoDbMetadataStorage, - ) - - configuration_class = EdgeWithMongoDbMetadataStorage - elif configuration == "edge-lite": - from edge_orchestrator.environment.edge_with_azure_container_storage import ( - EdgeWithAzureContainerStorage, - ) - - configuration_class = EdgeWithAzureContainerStorage - elif configuration == "upload-gcp": - from edge_orchestrator.environment.upload_with_gcp_bucket import ( - UploadWithGCPBucket, - ) - - configuration_class = UploadWithGCPBucket - - return configuration_class() - - -config = load_config() - - -def get_metadata_storage(): - return config.get_metadata_storage() - - -def get_binary_storage(): - return config.get_binary_storage() +def set_station_config_from_file( + station_config: StationConfig, active_config_path: Path +) -> StationConfig: + active_config_name = _get_active_config_name(active_config_path) + station_config.set_station_config(active_config_name) + return station_config -def get_model_forward(): - return config.get_model_forward() +@lru_cache() +def get_binary_storage() -> BinaryStorage: + station_config = get_station_config() + return BinaryStorageFactory.get_binary_storage( + station_config.active_config["binary_storage"].get("type"), + **station_config.active_config["binary_storage"].get("params", {}), + ) -def get_inventory(): - return config.get_inventory() +@lru_cache() +def get_metadata_storage() -> MetadataStorage: + station_config = get_station_config() + return MetadataStorageFactory.get_metadata_storage( + station_config.active_config["metadata_storage"]["type"], + **station_config.active_config["metadata_storage"].get("params", {}), + ) -def get_station_config(): - return config.get_station_config() +@lru_cache() +def get_edge_station() -> EdgeStation: + return EdgeStation(get_station_config()) -def get_edge_station(): - return config.get_edge_station() +@lru_cache() +def get_model_forward() -> ModelForward: + station_config = get_station_config() + return ModelForwardFactory.get_model_forward( + station_config.active_config["model_forward"]["type"], + **station_config.active_config["model_forward"].get("params", {}), + ) -def get_telemetry_sink(): - return config.get_telemetry_sink() +@lru_cache() +def get_telemetry_sink() -> TelemetrySink: + station_config = get_station_config() + return TelemetrySinkFactory.get_telemetry_sink( + station_config.active_config["telemetry_sink"]["type"], + **station_config.active_config["telemetry_sink"].get("params", {}), + ) diff --git a/edge_orchestrator/edge_orchestrator/application/api_routes.py b/edge_orchestrator/edge_orchestrator/application/api_routes.py index 842aba4d..8577d170 100644 --- a/edge_orchestrator/edge_orchestrator/application/api_routes.py +++ b/edge_orchestrator/edge_orchestrator/application/api_routes.py @@ -1,45 +1,58 @@ from http import HTTPStatus +from typing import List -from fastapi import APIRouter, Depends, Response -from fastapi.responses import JSONResponse +from fastapi import APIRouter, Depends +from starlette.responses import Response +from typing_extensions import Annotated +from edge_orchestrator import logger from edge_orchestrator.api_config import ( - get_binary_storage, - get_inventory, get_metadata_storage, get_station_config, + get_binary_storage, +) +from edge_orchestrator.application.config import ( + Settings, + get_settings, ) +from edge_orchestrator.application.dto.station_config import StationConfig from edge_orchestrator.application.dto.station_config_dto import StationConfigDto +from edge_orchestrator.application.no_active_configuration_exception import ( + NoActiveConfigurationException, +) from edge_orchestrator.domain.ports.binary_storage import BinaryStorage -from edge_orchestrator.domain.ports.inventory import Inventory from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage -from edge_orchestrator.domain.ports.station_config import StationConfig api_router = APIRouter() @api_router.get("/") -def home(): - return "the edge orchestrator is up and running" +async def home(settings: Annotated[Settings, Depends(get_settings)]): + infos = {"status": "edge-orchestrator up and running"} + infos.update(settings.model_dump(mode="json")) + return infos @api_router.get("/items") -def read_all(metadata_storage: MetadataStorage = Depends(get_metadata_storage)): +def read_all( + metadata_storage: Annotated[MetadataStorage, Depends(get_metadata_storage)] +): return metadata_storage.get_all_items_metadata() @api_router.get("/items/{item_id}") def get_item( - item_id: str, metadata_storage: MetadataStorage = Depends(get_metadata_storage) + item_id: str, + metadata_storage: Annotated[MetadataStorage, Depends(get_metadata_storage)], ): return metadata_storage.get_item_metadata(item_id) @api_router.get("/items/{item_id}/binaries/{camera_id}") def get_item_binary( - item_id: str, - camera_id: str, - binary_storage: BinaryStorage = Depends(get_binary_storage), + item_id: str, + camera_id: str, + binary_storage: BinaryStorage = Depends(get_binary_storage), ): content_binary = binary_storage.get_item_binary(item_id, camera_id) return Response( @@ -47,45 +60,47 @@ def get_item_binary( ) +# @api_router.get("/items/{item_id}/binaries") def get_item_binaries( - item_id: str, binary_storage: BinaryStorage = Depends(get_binary_storage) + item_id: str, + binary_storage: BinaryStorage = Depends(get_binary_storage), ): return binary_storage.get_item_binaries(item_id) @api_router.get("/items/{item_id}/state") def get_item_state( - item_id: str, metadata_storage: MetadataStorage = Depends(get_metadata_storage) + item_id: str, + metadata_storage: MetadataStorage = Depends(get_metadata_storage), ): return metadata_storage.get_item_state(item_id) -@api_router.get("/inventory") -def get_inventory(inventory: Inventory = Depends(get_inventory)): - return inventory - - @api_router.get("/configs") -def get_all_configs(station_config: StationConfig = Depends(get_station_config)): +def get_all_configs(station_config: StationConfig = Depends(get_station_config)) -> List[StationConfig]: station_config.load() return station_config.all_configs @api_router.get("/configs/active") def get_active_config(station_config: StationConfig = Depends(get_station_config)): + if station_config.active_config is None: + raise NoActiveConfigurationException("no_active_configuration") return station_config.active_config @api_router.post("/configs/active") -def set_station_config( - station_config_dto: StationConfigDto, - station_config: StationConfig = Depends(get_station_config), +def set_active_config( + station_config_dto: StationConfigDto, + station_config: StationConfig = Depends(get_station_config), ): - if station_config_dto.config_name == "": - return JSONResponse( - status_code=403, - content={"message": "No configuration selected!"}, - ) station_config.set_station_config(station_config_dto.config_name) return station_config.active_config + + +@api_router.post("/config") +def set_config( + station_config_dto: StationConfig, +): + logger.info(f"set config {station_config_dto.to_model()} to") diff --git a/edge_orchestrator/edge_orchestrator/application/config.py b/edge_orchestrator/edge_orchestrator/application/config.py new file mode 100644 index 00000000..df830d01 --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/application/config.py @@ -0,0 +1,79 @@ +import json +from abc import ABC, abstractmethod +from functools import lru_cache +from pathlib import Path +from typing import Dict, Any + +import yaml +from pydantic_settings import BaseSettings + +from application.dto.station_config import StationConfig +from constants import ACTIVE_CONFIG_FILE_PATH + + +class Settings(BaseSettings): + app_name: str = "edge-orchestrator API" + url_prefix: str = "/api/v1" + + # root_dir: Path = Path(__file__).resolve().parents[2] + # station_configs_folder: Path = root_dir / "config" / "station_configs" + # data_folder: Path = root_dir / "data" + # active_config_path: Path = root_dir / "config" / ".active_config" + # + # model_config = SettingsConfigDict(env_file=".env") + + +class _AbstractConfigReader(ABC): + def __init__(self, path: Path): + self._path = path + self.config = self.get_config() + + def get_config(self) -> StationConfig: + return self._read_file() + + @abstractmethod + def _read_file(self) -> StationConfig: + raise NotImplementedError("Not implemented") + + +class YamlConfigReader(_AbstractConfigReader): + def _read_file(self) -> StationConfig: + content = yaml.load(self._path.read_text(encoding="utf-8"), yaml.SafeLoader) + return StationConfig.from_payload(content) + + +class JsonConfigReader(_AbstractConfigReader): + def _read_file(self) -> StationConfig: + _content = json.loads(self._path.read_text(encoding="utf-8")) + return self.read_content(_content) + + @staticmethod + def read_content(content: Dict[str, Any]) -> StationConfig: + return StationConfig.from_payload(content) + + +class ConfigReader: + def __init__(self, path: Path): + self._path = path + self._reader = self._define_reader() + + def _define_reader(self) -> _AbstractConfigReader: + if self._path.suffixes[0] == ".json": + return JsonConfigReader(self._path) + elif self._path.suffixes[0] in [".yaml", ".yml"]: + return YamlConfigReader(self._path) + raise Exception( + f"Unexpected extension of the deployment file: {self._path}. " + f"Please check the documentation for supported extensions." + ) + + def get_config(self) -> StationConfig: + return self._reader.config + + +@lru_cache() +def get_settings() -> Settings: + settings = Settings() + config_reader = ConfigReader(ACTIVE_CONFIG_FILE_PATH) + config = config_reader.get_config() + return settings diff --git a/edge_orchestrator/edge_orchestrator/application/dto/binary_storage_config.py b/edge_orchestrator/edge_orchestrator/application/dto/binary_storage_config.py new file mode 100644 index 00000000..523f2a93 --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/application/dto/binary_storage_config.py @@ -0,0 +1,16 @@ +from enum import Enum +from typing import Optional, Dict, Any + +from pydantic import BaseModel + + +class BinaryStorageTypeEnum(str, Enum): + azure_container = "azure_container" + filesystem = "filesystem" + gcp_bucket = "gcp_bucket" + in_memory = "in_memory" + + +class BinaryStorageConfig(BaseModel): + type: BinaryStorageTypeEnum = BinaryStorageTypeEnum.filesystem + params: Optional[Dict[str, Any]] = None diff --git a/edge_orchestrator/edge_orchestrator/application/dto/camera_config.py b/edge_orchestrator/edge_orchestrator/application/dto/camera_config.py new file mode 100644 index 00000000..ca3359b3 --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/application/dto/camera_config.py @@ -0,0 +1,55 @@ +from enum import Enum +from typing import Dict, Any, List + +from pydantic import ( + BaseModel, + StringConstraints, PositiveInt, +) +from typing_extensions import Annotated + +from edge_orchestrator.application.dto.model_config import ModelConfig + +CameraName = Annotated[ + str, + StringConstraints( + strip_whitespace=True, to_lower=True, pattern=r"camera_#[0-9]+" + ), +] + + +class CameraTypeEnum(str, Enum): + fake = "fake" + pi_camera = "pi_camera" + usb_camera = "usb_camera" + + +class CameraPositionEnum(str, Enum): + front = "front" + back = "back" + left = "left" + right = "right" + + +class CameraRuleNameEnum(str, Enum): + expected_label_rule = "expected_label_rule" + max_nb_objects_rule = "max_nb_objects_rule" + min_nb_objects_rule = "min_nb_objects_rule" + + +class CameraRule(BaseModel): + name: CameraRuleNameEnum = CameraRuleNameEnum.expected_label_rule + params: Dict[str, Any] = None + + +class CameraConfig(BaseModel): + name: CameraName = "camera_#1" + type: CameraTypeEnum = CameraTypeEnum.fake + source: str + position: CameraPositionEnum = CameraPositionEnum.front + exposition: PositiveInt = 100 + + +class CameraLogic(BaseModel): + name: CameraName = "camera_#1" + models_graph: List[ModelConfig] + rule: CameraRule = CameraRule() diff --git a/edge_orchestrator/edge_orchestrator/application/dto/edge_station_config.py b/edge_orchestrator/edge_orchestrator/application/dto/edge_station_config.py new file mode 100644 index 00000000..049670f8 --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/application/dto/edge_station_config.py @@ -0,0 +1,14 @@ +from enum import Enum +from typing import Optional, Dict, Any + +from pydantic import BaseModel + + +class ItemRuleNameEnum(str, Enum): + min_threshold_KO_rule = "min_threshold_KO_rule" + threshold_ratio_rule = "threshold_ratio_rule" + + +class ItemRule(BaseModel): + name: ItemRuleNameEnum = ItemRuleNameEnum.min_threshold_KO_rule + params: Optional[Dict[str, Any]] = None diff --git a/edge_orchestrator/edge_orchestrator/application/dto/metadata_storage_config.py b/edge_orchestrator/edge_orchestrator/application/dto/metadata_storage_config.py new file mode 100644 index 00000000..cc90fbf3 --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/application/dto/metadata_storage_config.py @@ -0,0 +1,17 @@ +from enum import Enum +from typing import Optional, Dict, Any + +from pydantic import BaseModel + + +class MetadataStorageTypeEnum(str, Enum): + azure_container = "azure_container" + filesystem = "filesystem" + gcp_bucket = "gcp_bucket" + in_memory = "in_memory" + mongo_db = "mongo_db" + + +class MetadataStorageConfig(BaseModel): + type: MetadataStorageTypeEnum = MetadataStorageTypeEnum.filesystem + params: Optional[Dict[str, Any]] = None diff --git a/edge_orchestrator/edge_orchestrator/application/dto/model_config.py b/edge_orchestrator/edge_orchestrator/application/dto/model_config.py new file mode 100644 index 00000000..899a5c25 --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/application/dto/model_config.py @@ -0,0 +1,81 @@ +from enum import Enum +from typing import Optional, List + +import annotated_types +from pydantic import ( + StringConstraints, + BaseModel, + FilePath, + PositiveInt, + Field, + model_validator, +) +from typing_extensions import Annotated + + +class ModelNameEnum(str, Enum): + inception = "inception" + mask_classification_model = "mask_classification_model" + marker_quality_control = "marker_quality_control" + mobilenet_v1_640x640 = "mobilenet_v1_640x640" + mobilenet_ssd_v2_coco = "mobilenet_ssd_v2_coco" + mobilenet_ssd_v2_face = "mobilenet_ssd_v2_face" + cellphone_connection_control = "cellphone_connection_control" + + +class ModelCategoryEnum(str, Enum): + classification = "classification" + object_detection = "object_detection" + + +Version = Annotated[ + str, + StringConstraints(pattern=r"^(0|[1-9]\d*)(\.(0|[1-9]\d*)){0,2}"), +] + + +class ModelOutput(BaseModel): + boxes_coordinates: str + objectness_scores: str + number_of_boxes: Optional[str] = None + detection_classes: str + + +class ModelConfig(BaseModel): + name: ModelNameEnum + category: ModelCategoryEnum + version: Version + class_names: Optional[List[str]] = None + class_names_path: Optional[FilePath] = None + output: Optional[ModelOutput] = None + image_resolution: List[PositiveInt] = Field(min_items=2, max_items=2) + objectness_threshold: Optional[ + Annotated[float, annotated_types.Interval(gt=0, le=1.0)] + ] = None + depends_on: Optional[List[str]] = None + class_to_detect: Optional[List[str]] = None + + @model_validator(mode="after") + def check_class_names_or_class_names_path(self) -> "ModelConfig": + if (not self.class_names and not self.class_names_path) or ( + self.class_names and self.class_names_path + ): + raise ValueError( + "Either class_names or class_names_path is required (not both)" + ) + return self + + @model_validator(mode="after") + def check_object_detection_model(self) -> "ModelConfig": + if self.category is ModelCategoryEnum.object_detection: + if not self.output: + raise ValueError("output is required with object_detection category") + if not self.objectness_threshold: + raise ValueError( + "objectness_threshold is required with object_detection category" + ) + if not self.class_to_detect: + raise ValueError( + "class_to_detect is required with object_detection category" + ) + return self diff --git a/edge_orchestrator/edge_orchestrator/application/dto/model_forward_config.py b/edge_orchestrator/edge_orchestrator/application/dto/model_forward_config.py new file mode 100644 index 00000000..8c2732f3 --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/application/dto/model_forward_config.py @@ -0,0 +1,14 @@ +from enum import Enum +from typing import Optional, Dict, Any + +from pydantic import BaseModel + + +class ModelForwardTypeEnum(str, Enum): + fake = "fake" + tf_serving = "tf_serving" + + +class ModelForwardConfig(BaseModel): + type: ModelForwardTypeEnum + params: Optional[Dict[str, Any]] = None diff --git a/edge_orchestrator/edge_orchestrator/application/dto/station_config.py b/edge_orchestrator/edge_orchestrator/application/dto/station_config.py new file mode 100644 index 00000000..2dde348c --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/application/dto/station_config.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import logging +from typing import Dict, Any, Optional, List + +from pydantic import BaseModel + +from edge_orchestrator.application.dto.binary_storage_config import BinaryStorageConfig +from edge_orchestrator.application.dto.camera_config import CameraConfig, CameraLogic +from edge_orchestrator.application.dto.edge_station_config import ItemRule +from edge_orchestrator.application.dto.metadata_storage_config import MetadataStorageConfig +from edge_orchestrator.application.dto.model_forward_config import ModelForwardConfig +from edge_orchestrator.application.dto.telemetry_sink_config import TelemetrySinkDto + + +class InfrastructureConfig(BaseModel): + binary_storage: BinaryStorageConfig + metadata_storage: MetadataStorageConfig + mode1_forward: ModelForwardConfig + telemetry_sink: TelemetrySinkDto + cameras: List[CameraConfig] + + +class DomainRulesConfig(BaseModel): + camera_rules: List[CameraLogic] + item_rule: ItemRule = ItemRule() + + +class StationConfig(BaseModel): + infra_config: Optional[InfrastructureConfig] + domain_config: Optional[DomainRulesConfig] + + def to_model(self): + print(self) + return + + @staticmethod + def _prepare_infra(payload: Dict[str, Any]) -> InfrastructureConfig: + _infra_payload = payload.get("infra", {}) + if not _infra_payload: + logging.info("No infra logic defined in the config file. Default infra logic will be used.") + _infra_payload = {"binary_storage": BinaryStorageConfig(), "metadata_storage": MetadataStorageConfig(), + "mode1_forward": ModelForwardConfig(), "telemetry": TelemetrySinkDto(), + "cameras": [CameraConfig()]} + return InfrastructureConfig(**_infra_payload) + + @staticmethod + def _prepare_domain_rules(payload: Dict[str, Any]) -> DomainRulesConfig: + _domain_rules_payload = payload.get("domain", {}) + if not _domain_rules_payload: + logging.info("No domain logic defined in the config file. Default domain logic will be used.") + _domain_rules_payload = {"camera_rules": [CameraLogic()], "item_rule": ItemRule()} + return DomainRulesConfig(**_domain_rules_payload) + + @classmethod + def from_payload(cls, payload: Dict[str, Any]) -> StationConfig: + return StationConfig(infra_config=cls._prepare_infra(payload), + domain_config=cls._prepare_domain_rules(payload)) diff --git a/edge_orchestrator/edge_orchestrator/application/dto/telemetry_sink_config.py b/edge_orchestrator/edge_orchestrator/application/dto/telemetry_sink_config.py new file mode 100644 index 00000000..14880012 --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/application/dto/telemetry_sink_config.py @@ -0,0 +1,15 @@ +from enum import Enum +from typing import Optional, Dict, Any + +from pydantic import BaseModel + + +class TelemetrySinkTypeEnum(str, Enum): + azure_iot_hub = "azure_iot_hub" + fake = "fake" + postgres = "postgres" + + +class TelemetrySinkDto(BaseModel): + type: TelemetrySinkTypeEnum = TelemetrySinkTypeEnum.fake + params: Optional[Dict[str, Any]] = None diff --git a/edge_orchestrator/edge_orchestrator/application/no_active_configuration_exception.py b/edge_orchestrator/edge_orchestrator/application/no_active_configuration_exception.py new file mode 100644 index 00000000..e80fd49c --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/application/no_active_configuration_exception.py @@ -0,0 +1,19 @@ +from fastapi import Request +from fastapi.responses import JSONResponse + + +class NoActiveConfigurationException(Exception): + def __init__(self, name: str): + self.name = name + + +async def no_active_configuration_exception_handler( + request: Request, exc: NoActiveConfigurationException +): + return JSONResponse( + status_code=403, + content={ + "message": f"No active configuration selected: {exc.name}! " + "Set the active station configuration before triggering the inspection or the upload." + }, + ) diff --git a/edge_orchestrator/edge_orchestrator/application/server.py b/edge_orchestrator/edge_orchestrator/application/server.py index 73e54687..4df118dc 100644 --- a/edge_orchestrator/edge_orchestrator/application/server.py +++ b/edge_orchestrator/edge_orchestrator/application/server.py @@ -1,15 +1,21 @@ from fastapi import FastAPI from starlette.middleware.cors import CORSMiddleware -from edge_orchestrator.application import settings from edge_orchestrator.application.api_routes import api_router +from edge_orchestrator.application.no_active_configuration_exception import ( + NoActiveConfigurationException, + no_active_configuration_exception_handler, +) from edge_orchestrator.application.trigger_routes import trigger_router def server() -> FastAPI: - app = FastAPI(title=settings.app_name) - app.include_router(api_router, prefix=settings.url_prefix) - app.include_router(trigger_router, prefix=settings.url_prefix) + app = FastAPI() + app.include_router(api_router) + app.include_router(trigger_router) + app.add_exception_handler( + NoActiveConfigurationException, no_active_configuration_exception_handler + ) app.add_middleware( CORSMiddleware, allow_origins=["*"], diff --git a/edge_orchestrator/edge_orchestrator/application/settings.py b/edge_orchestrator/edge_orchestrator/application/settings.py deleted file mode 100644 index 5846f5ac..00000000 --- a/edge_orchestrator/edge_orchestrator/application/settings.py +++ /dev/null @@ -1,2 +0,0 @@ -app_name = "edge-orchestrator" -url_prefix = "/api/v1" diff --git a/edge_orchestrator/edge_orchestrator/application/trigger_routes.py b/edge_orchestrator/edge_orchestrator/application/trigger_routes.py index e59fc68d..5876193d 100644 --- a/edge_orchestrator/edge_orchestrator/application/trigger_routes.py +++ b/edge_orchestrator/edge_orchestrator/application/trigger_routes.py @@ -1,22 +1,34 @@ -from fastapi import APIRouter, BackgroundTasks, File, UploadFile +from fastapi import APIRouter, BackgroundTasks, File, UploadFile, Depends from fastapi.responses import JSONResponse +from typing_extensions import Annotated +from application.dto.station_config import StationConfig +from edge_orchestrator.api_config import ( + get_supervisor, + get_station_config, + get_uploader, +) from edge_orchestrator.domain.models.item import Item -from edge_orchestrator.domain.use_cases.supervisor import Supervisor -from edge_orchestrator.domain.use_cases.uploader import Uploader trigger_router = APIRouter() -supervisor = Supervisor() -uploader = Uploader() - @trigger_router.post("/trigger") async def trigger_job( - image: UploadFile = None, background_tasks: BackgroundTasks = None + station_config: Annotated[StationConfig, Depends(get_station_config)], + image: UploadFile = None, + background_tasks: BackgroundTasks = None, ): - item = Item.from_nothing() - if supervisor.station_config.active_config is None: + if station_config.active_config: + supervisor = get_supervisor() + item = Item.from_nothing() + if image: + contents = image.file.read() + camera_id = supervisor.station_config.get_cameras()[0] + item.binaries = {camera_id: contents} + background_tasks.add_task(supervisor.inspect, item) + return {"item_id": item.id} + else: return JSONResponse( status_code=403, content={ @@ -24,21 +36,26 @@ async def trigger_job( "Set the active station configuration before triggering the inspection." }, ) - else: - if image: - contents = image.file.read() - camera_id = supervisor.station_config.get_cameras()[0] - item.binaries = {camera_id: contents} - background_tasks.add_task(supervisor.inspect, item) - return {"item_id": item.id} @trigger_router.post("/upload") async def upload_job( - image: UploadFile = File(...), background_tasks: BackgroundTasks = None + station_config: Annotated[StationConfig, Depends(get_station_config)], + image: UploadFile = File(...), + background_tasks: BackgroundTasks = None, ): - item = Item.from_nothing() - contents = image.file.read() - item.binaries = {"0": contents} - background_tasks.add_task(uploader.upload, item) - return {"item_id": item.id} + if station_config.active_config: + uploader = get_uploader() + item = Item.from_nothing() + contents = image.file.read() + item.binaries = {"0": contents} + background_tasks.add_task(uploader.upload, item) + return {"item_id": item.id} + else: + return JSONResponse( + status_code=403, + content={ + "message": "No active configuration selected! " + "Set the active station configuration before uploading the image." + }, + ) diff --git a/edge_orchestrator/edge_orchestrator/constants.py b/edge_orchestrator/edge_orchestrator/constants.py new file mode 100644 index 00000000..29adc544 --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/constants.py @@ -0,0 +1,5 @@ +from pathlib import Path + +ORCHESTRATOR_PATH = Path(".") +CONFIG_PATH = ORCHESTRATOR_PATH / "config" +ACTIVE_CONFIG_FILE_PATH = CONFIG_PATH / ".active_config.json" diff --git a/edge_orchestrator/edge_orchestrator/domain/models/edge_station.py b/edge_orchestrator/edge_orchestrator/domain/models/edge_station.py index b11dfe34..2b963aa4 100644 --- a/edge_orchestrator/edge_orchestrator/domain/models/edge_station.py +++ b/edge_orchestrator/edge_orchestrator/domain/models/edge_station.py @@ -1,25 +1,28 @@ -from typing import Dict, Tuple +from typing import Dict, Tuple, Any, List -from edge_orchestrator.domain.ports.station_config import StationConfig +from application.dto.station_config import StationConfig +from domain.models.camera import Camera class EdgeStation: def __init__(self, station_config: StationConfig): self.station_config = station_config + self.cameras = self.register_cameras() - def register_cameras(self, station_config: StationConfig): - self.cameras = [] - for camera_id in station_config.get_cameras(): - camera_type = station_config.get_camera_type(camera_id) - camera_settings = station_config.get_camera_settings(camera_id) + def register_cameras(self) -> List[Camera]: + cameras = [] + for camera_id in self.station_config.get_cameras(): + camera_type = self.station_config.get_camera_type(camera_id) + camera_settings = self.station_config.get_camera_settings(camera_id) camera = camera_type(id=camera_id, settings=camera_settings) - self.cameras.append(camera) + cameras.append(camera) + return cameras def capture(self) -> Tuple[Dict, Dict]: - binaries = {} + binaries: Dict[str, bytes] = {} for camera in self.cameras: binaries[camera.id] = camera.capture() - cameras_metadata = { + cameras_metadata: Dict[str, Any] = { camera.id: self.station_config.get_camera_settings(camera.id) for camera in self.cameras } diff --git a/edge_orchestrator/edge_orchestrator/domain/models/model_infos.py b/edge_orchestrator/edge_orchestrator/domain/models/model_infos.py index ae2d1b00..e86817ea 100644 --- a/edge_orchestrator/edge_orchestrator/domain/models/model_infos.py +++ b/edge_orchestrator/edge_orchestrator/domain/models/model_infos.py @@ -1,10 +1,7 @@ -import os from enum import Enum from pathlib import Path from typing import Dict, List, Optional -from edge_orchestrator.domain.ports.inventory import Inventory - class ModelInfos: def __init__( @@ -47,30 +44,21 @@ def from_model_graph_node( camera_id: str, model_id: str, model: Dict, - inventory: Inventory, data_folder: Path, ): model_name = model["name"] - class_names = inventory.models[model_name].get("class_names") + class_names = model.get("class_names") class_to_detect = model.get("class_to_detect") - class_names_path = inventory.models[model_name].get("class_names_path") - objectness_threshold = inventory.models[model_name].get("objectness_threshold") + class_names_path = model.get("class_names_path") + objectness_threshold = model.get("objectness_threshold") - if inventory.models[model_name].get("class_names_path") is not None: - class_names_path = os.path.join(data_folder, class_names_path) + if class_names_path is not None: + class_names_path = data_folder / class_names_path try: - boxes_coordinates = ( - inventory.models[model_name].get("output").get("boxes_coordinates") - ) - objectness_scores = ( - inventory.models[model_name].get("output").get("objectness_scores") - ) - number_of_boxes = ( - inventory.models[model_name].get("output").get("number_of_boxes") - ) - detection_classes = ( - inventory.models[model_name].get("output").get("detection_classes") - ) + boxes_coordinates = model.get("output").get("boxes_coordinates") + objectness_scores = model.get("output").get("objectness_scores") + number_of_boxes = model.get("output").get("number_of_boxes") + detection_classes = model.get("output").get("detection_classes") except AttributeError: boxes_coordinates = None objectness_scores = None @@ -80,8 +68,8 @@ def from_model_graph_node( return ModelInfos( id=model_id, name=model_name, - category=inventory.models[model_name]["category"], - version=str(inventory.models[model_name]["version"]), + category=model["category"], + version=model["version"], depends_on=model["depends_on"], camera_id=camera_id, class_names=class_names, @@ -90,7 +78,7 @@ def from_model_graph_node( objectness_scores=objectness_scores, number_of_boxes=number_of_boxes, detection_classes=detection_classes, - image_resolution=inventory.models[model_name].get("image_resolution"), + image_resolution=model.get("image_resolution"), class_to_detect=class_to_detect, objectness_threshold=objectness_threshold, ) diff --git a/edge_orchestrator/edge_orchestrator/domain/ports/inventory.py b/edge_orchestrator/edge_orchestrator/domain/ports/inventory.py deleted file mode 100644 index c5d0959a..00000000 --- a/edge_orchestrator/edge_orchestrator/domain/ports/inventory.py +++ /dev/null @@ -1,8 +0,0 @@ -from typing import Dict, List, Union - - -class Inventory: - cameras: List[str] - models: Dict[str, Dict[str, Union[str, int]]] - camera_rules: List[str] - item_rules: List[str] diff --git a/edge_orchestrator/edge_orchestrator/domain/ports/station_config.py b/edge_orchestrator/edge_orchestrator/domain/ports/station_config.py index 741c94d7..8b137891 100644 --- a/edge_orchestrator/edge_orchestrator/domain/ports/station_config.py +++ b/edge_orchestrator/edge_orchestrator/domain/ports/station_config.py @@ -1,31 +1 @@ -from abc import abstractmethod -from typing import Dict, List, Optional, Type, Union -from edge_orchestrator.domain.models.camera import Camera -from edge_orchestrator.domain.models.model_infos import ModelInfos - - -class StationConfig: - all_configs: dict - active_config_name: Optional[str] - active_config: Optional[dict] - - @abstractmethod - def get_model_pipeline_for_camera(self, camera_id: str) -> List[ModelInfos]: - pass - - @abstractmethod - def get_cameras(self) -> List[str]: - pass - - @abstractmethod - def get_camera_type(self, camera_id: str) -> Type[Camera]: - pass - - @abstractmethod - def get_camera_settings(self, camera_id: str) -> Dict[str, Union[str, int]]: - pass - - @abstractmethod - def set_station_config(self, config_name: str) -> None: - pass diff --git a/edge_orchestrator/edge_orchestrator/domain/use_cases/supervisor.py b/edge_orchestrator/edge_orchestrator/domain/use_cases/supervisor.py index cc312fa2..e5742ee1 100644 --- a/edge_orchestrator/edge_orchestrator/domain/use_cases/supervisor.py +++ b/edge_orchestrator/edge_orchestrator/domain/use_cases/supervisor.py @@ -6,24 +6,22 @@ from PIL import Image -from edge_orchestrator.api_config import ( - get_binary_storage, - get_edge_station, - get_metadata_storage, - get_model_forward, - get_station_config, - get_telemetry_sink, - logger, -) +from application.dto.station_config import StationConfig +from edge_orchestrator import logger from edge_orchestrator.domain.models.camera import ( get_camera_rule, get_last_inference_by_camera, ) from edge_orchestrator.domain.models.decision import Decision +from edge_orchestrator.domain.models.edge_station import EdgeStation from edge_orchestrator.domain.models.item import Item from edge_orchestrator.domain.models.item import get_item_rule from edge_orchestrator.domain.models.model_infos import ModelInfos from edge_orchestrator.domain.models.supervisor_state import SupervisorState +from edge_orchestrator.domain.ports.binary_storage import BinaryStorage +from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage +from edge_orchestrator.domain.ports.model_forward import ModelForward +from edge_orchestrator.domain.ports.telemetry_sink import TelemetrySink def check_capture_according_to_config(item: Item, cameras: List[Dict]): @@ -40,18 +38,18 @@ def check_capture_according_to_config(item: Item, cameras: List[Dict]): class Supervisor: def __init__( self, - metadata_storage=get_metadata_storage(), - binary_storage=get_binary_storage(), - model_forward=get_model_forward(), - station_config=get_station_config(), - edge_station=get_edge_station(), - telemetry_sink=get_telemetry_sink(), + binary_storage: BinaryStorage, + edge_station: EdgeStation, + metadata_storage: MetadataStorage, + model_forward: ModelForward, + station_config: StationConfig, + telemetry_sink: TelemetrySink, ): - self.metadata_storage = metadata_storage self.binary_storage = binary_storage + self.edge_station = edge_station + self.metadata_storage = metadata_storage self.model_forward = model_forward self.station_config = station_config - self.edge_station = edge_station self.telemetry_sink = telemetry_sink def save_item_metadata(self, fct): @@ -65,8 +63,6 @@ async def wrapper(item: Item, *args): async def inspect(self, item: Item): item.station_config = self.station_config.active_config_name - if self.edge_station is not None: - self.edge_station.register_cameras(self.station_config) tasks = OrderedDict() diff --git a/edge_orchestrator/edge_orchestrator/domain/use_cases/uploader.py b/edge_orchestrator/edge_orchestrator/domain/use_cases/uploader.py index 648fb620..561d6ed4 100644 --- a/edge_orchestrator/edge_orchestrator/domain/use_cases/uploader.py +++ b/edge_orchestrator/edge_orchestrator/domain/use_cases/uploader.py @@ -2,11 +2,9 @@ from collections import OrderedDict from enum import Enum -from edge_orchestrator.api_config import ( - get_binary_storage, - get_metadata_storage, - logger, -) +from domain.ports.binary_storage import BinaryStorage +from domain.ports.metadata_storage import MetadataStorage +from edge_orchestrator import logger from edge_orchestrator.domain.models.item import Item @@ -18,8 +16,8 @@ class UploaderState(Enum): class Uploader: def __init__( self, - metadata_storage=get_metadata_storage(), - binary_storage=get_binary_storage(), + metadata_storage: MetadataStorage, + binary_storage: BinaryStorage, ): self.metadata_storage = metadata_storage self.binary_storage = binary_storage diff --git a/edge_orchestrator/edge_orchestrator/environment/__init__.py b/edge_orchestrator/edge_orchestrator/environment/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/edge_orchestrator/edge_orchestrator/environment/config.py b/edge_orchestrator/edge_orchestrator/environment/config.py deleted file mode 100644 index f9983c19..00000000 --- a/edge_orchestrator/edge_orchestrator/environment/config.py +++ /dev/null @@ -1,42 +0,0 @@ -from pathlib import Path - -from edge_orchestrator.domain.models.edge_station import EdgeStation -from edge_orchestrator.domain.ports.binary_storage import BinaryStorage -from edge_orchestrator.domain.ports.inventory import Inventory -from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage -from edge_orchestrator.domain.ports.model_forward import ModelForward -from edge_orchestrator.domain.ports.station_config import StationConfig -from edge_orchestrator.domain.ports.telemetry_sink import TelemetrySink - - -class Config: - ROOT_PATH = Path(__file__).parents[2] - - metadata_storage: MetadataStorage = None - model_forward: ModelForward = None - binary_storage: BinaryStorage = None - inventory: Inventory = None - station_config: StationConfig = None - edge_station: EdgeStation = None - telemetry_sink: TelemetrySink = None - - def get_metadata_storage(self): - return self.metadata_storage - - def get_binary_storage(self): - return self.binary_storage - - def get_model_forward(self): - return self.model_forward - - def get_edge_station(self): - return self.edge_station - - def get_inventory(self): - return self.inventory - - def get_station_config(self): - return self.station_config - - def get_telemetry_sink(self): - return self.telemetry_sink diff --git a/edge_orchestrator/edge_orchestrator/environment/default.py b/edge_orchestrator/edge_orchestrator/environment/default.py deleted file mode 100644 index 2ad619ca..00000000 --- a/edge_orchestrator/edge_orchestrator/environment/default.py +++ /dev/null @@ -1,37 +0,0 @@ -from edge_orchestrator.domain.models.edge_station import EdgeStation -from edge_orchestrator.environment.config import Config - -from edge_orchestrator.infrastructure.binary_storage.filesystem_binary_storage import ( - FileSystemBinaryStorage, -) -from edge_orchestrator.infrastructure.inventory.json_inventory import JsonInventory -from edge_orchestrator.infrastructure.metadata_storage.memory_metadata_storage import ( - MemoryMetadataStorage, -) - -from edge_orchestrator.infrastructure.model_forward.fake_model_forward import ( - FakeModelForward, -) -from edge_orchestrator.infrastructure.station_config.json_station_config import ( - JsonStationConfig, -) -from edge_orchestrator.infrastructure.telemetry_sink.fake_telemetry_sink import ( - FakeTelemetrySink, -) - - -class Default(Config): - def __init__(self): - self.metadata_storage = MemoryMetadataStorage() - self.model_forward = FakeModelForward() - self.binary_storage = FileSystemBinaryStorage( - self.ROOT_PATH / "data" / "storage" - ) - self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json") - self.station_config = JsonStationConfig( - self.ROOT_PATH / "config" / "station_configs", - self.inventory, - self.ROOT_PATH / "data", - ) - self.edge_station = EdgeStation(self.station_config) - self.telemetry_sink = FakeTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/environment/docker.py b/edge_orchestrator/edge_orchestrator/environment/docker.py deleted file mode 100644 index 5ee09c8a..00000000 --- a/edge_orchestrator/edge_orchestrator/environment/docker.py +++ /dev/null @@ -1,47 +0,0 @@ -import os - -from edge_orchestrator.domain.models.edge_station import EdgeStation -from edge_orchestrator.environment.config import Config -from edge_orchestrator.infrastructure.binary_storage.filesystem_binary_storage import ( - FileSystemBinaryStorage, -) -from edge_orchestrator.infrastructure.inventory.json_inventory import JsonInventory -from edge_orchestrator.infrastructure.metadata_storage.mongodb_metadata_storage import ( - MongoDbMetadataStorage, -) -from edge_orchestrator.infrastructure.model_forward.tf_serving_wrapper import ( - TFServingWrapper, -) -from edge_orchestrator.infrastructure.station_config.json_station_config import ( - JsonStationConfig, -) -from edge_orchestrator.infrastructure.telemetry_sink.postgresql_telemetry_sink import ( - PostgresTelemetrySink, -) - - -class Docker(Config): - MONGO_DB_URI = os.environ.get("MONGO_DB_URI", "mongodb://edge_db:27017/") - POSTGRES_DB_URI = os.environ.get( - "POSTGRES_DB_URI", "postgresql://vio:vio@hub_monitoring_db:5432/vio" - ) - SERVING_MODEL_URL = os.environ.get( - "SERVING_MODEL_URL", "http://edge_model_serving:8501" - ) - - def __init__(self): - self.metadata_storage = MongoDbMetadataStorage(self.MONGO_DB_URI) - self.binary_storage = FileSystemBinaryStorage( - self.ROOT_PATH / "data" / "storage" - ) - self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json") - self.station_config = JsonStationConfig( - station_configs_folder=self.ROOT_PATH / "config" / "station_configs", - inventory=self.inventory, - data_folder=self.ROOT_PATH / "data", - ) - self.edge_station = EdgeStation(self.station_config) - self.model_forward = TFServingWrapper( - self.SERVING_MODEL_URL, self.inventory, self.station_config - ) - self.telemetry_sink = PostgresTelemetrySink(self.POSTGRES_DB_URI) diff --git a/edge_orchestrator/edge_orchestrator/environment/edge_with_azure_container_storage.py b/edge_orchestrator/edge_orchestrator/environment/edge_with_azure_container_storage.py deleted file mode 100644 index d5d12c23..00000000 --- a/edge_orchestrator/edge_orchestrator/environment/edge_with_azure_container_storage.py +++ /dev/null @@ -1,41 +0,0 @@ -import os - -from edge_orchestrator.domain.models.edge_station import EdgeStation -from edge_orchestrator.environment.config import Config -from edge_orchestrator.infrastructure.binary_storage.azure_container_binary_storage import ( - AzureContainerBinaryStorage, -) -from edge_orchestrator.infrastructure.inventory.json_inventory import JsonInventory -from edge_orchestrator.infrastructure.metadata_storage.azure_container_metadata_storage import ( - AzureContainerMetadataStorage, -) -from edge_orchestrator.infrastructure.model_forward.tf_serving_wrapper import ( - TFServingWrapper, -) -from edge_orchestrator.infrastructure.station_config.json_station_config import ( - JsonStationConfig, -) -from edge_orchestrator.infrastructure.telemetry_sink.azure_iot_hub_telemetry_sink import ( - AzureIotHubTelemetrySink, -) - - -class EdgeWithAzureContainerStorage(Config): - SERVING_MODEL_URL = os.environ.get( - "SERVING_MODEL_URL", "http://edge_model_serving:8501" - ) - - def __init__(self): - self.metadata_storage = AzureContainerMetadataStorage() - self.binary_storage = AzureContainerBinaryStorage() - self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json") - self.station_config = JsonStationConfig( - self.ROOT_PATH / "config" / "station_configs", - self.inventory, - self.ROOT_PATH / "data", - ) - self.edge_station = EdgeStation(self.station_config) - self.model_forward = TFServingWrapper( - self.SERVING_MODEL_URL, self.inventory, self.station_config - ) - self.telemetry_sink = AzureIotHubTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/environment/edge_with_filesystem_metadata_storage.py b/edge_orchestrator/edge_orchestrator/environment/edge_with_filesystem_metadata_storage.py deleted file mode 100644 index 1d765328..00000000 --- a/edge_orchestrator/edge_orchestrator/environment/edge_with_filesystem_metadata_storage.py +++ /dev/null @@ -1,45 +0,0 @@ -import os - -from edge_orchestrator.domain.models.edge_station import EdgeStation -from edge_orchestrator.environment.config import Config -from edge_orchestrator.infrastructure.binary_storage.filesystem_binary_storage import ( - FileSystemBinaryStorage, -) -from edge_orchestrator.infrastructure.inventory.json_inventory import JsonInventory -from edge_orchestrator.infrastructure.metadata_storage.filesystem_metadata_storage import ( - FileSystemMetadataStorage, -) -from edge_orchestrator.infrastructure.model_forward.tf_serving_wrapper import ( - TFServingWrapper, -) -from edge_orchestrator.infrastructure.station_config.json_station_config import ( - JsonStationConfig, -) -from edge_orchestrator.infrastructure.telemetry_sink.azure_iot_hub_telemetry_sink import ( - AzureIotHubTelemetrySink, -) - - -class EdgeWithFileSystemMetadataStorage(Config): - SERVING_MODEL_URL = os.environ.get( - "SERVING_MODEL_URL", "http://edge_model_serving:8501" - ) - - def __init__(self): - self.metadata_storage = FileSystemMetadataStorage( - self.ROOT_PATH / "data" / "storage" - ) - self.binary_storage = FileSystemBinaryStorage( - self.ROOT_PATH / "data" / "storage" - ) - self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json") - self.station_config = JsonStationConfig( - self.ROOT_PATH / "config" / "station_configs", - self.inventory, - self.ROOT_PATH / "data", - ) - self.edge_station = EdgeStation(self.station_config) - self.model_forward = TFServingWrapper( - self.SERVING_MODEL_URL, self.inventory, self.station_config - ) - self.telemetry_sink = AzureIotHubTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/environment/edge_with_mongo_db_metadata_storage.py b/edge_orchestrator/edge_orchestrator/environment/edge_with_mongo_db_metadata_storage.py deleted file mode 100644 index 0a39276a..00000000 --- a/edge_orchestrator/edge_orchestrator/environment/edge_with_mongo_db_metadata_storage.py +++ /dev/null @@ -1,44 +0,0 @@ -import os - -from edge_orchestrator.domain.models.edge_station import EdgeStation -from edge_orchestrator.environment.config import Config -from edge_orchestrator.infrastructure.binary_storage.filesystem_binary_storage import ( - FileSystemBinaryStorage, -) -from edge_orchestrator.infrastructure.inventory.json_inventory import JsonInventory -from edge_orchestrator.infrastructure.metadata_storage.mongodb_metadata_storage import ( - MongoDbMetadataStorage, -) -from edge_orchestrator.infrastructure.model_forward.tf_serving_wrapper import ( - TFServingWrapper, -) -from edge_orchestrator.infrastructure.station_config.json_station_config import ( - JsonStationConfig, -) -from edge_orchestrator.infrastructure.telemetry_sink.azure_iot_hub_telemetry_sink import ( - AzureIotHubTelemetrySink, -) - - -class EdgeWithMongoDbMetadataStorage(Config): - MONGO_DB_URI = os.environ.get("MONGO_DB_URI", "mongodb://edge_db:27017/") - SERVING_MODEL_URL = os.environ.get( - "SERVING_MODEL_URL", "http://edge_model_serving:8501" - ) - - def __init__(self): - self.metadata_storage = MongoDbMetadataStorage(self.MONGO_DB_URI) - self.binary_storage = FileSystemBinaryStorage( - self.ROOT_PATH / "data" / "storage" - ) - self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json") - self.station_config = JsonStationConfig( - self.ROOT_PATH / "config" / "station_configs", - self.inventory, - self.ROOT_PATH / "data", - ) - self.edge_station = EdgeStation(self.station_config) - self.model_forward = TFServingWrapper( - self.SERVING_MODEL_URL, self.inventory, self.station_config - ) - self.telemetry_sink = AzureIotHubTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/environment/test.py b/edge_orchestrator/edge_orchestrator/environment/test.py deleted file mode 100644 index 66b77ece..00000000 --- a/edge_orchestrator/edge_orchestrator/environment/test.py +++ /dev/null @@ -1,50 +0,0 @@ -import os -from pathlib import Path - -from edge_orchestrator.domain.models.edge_station import EdgeStation -from edge_orchestrator.environment.config import Config -from edge_orchestrator.infrastructure.binary_storage.filesystem_binary_storage import ( - FileSystemBinaryStorage, -) -from edge_orchestrator.infrastructure.inventory.json_inventory import JsonInventory -from edge_orchestrator.infrastructure.metadata_storage.mongodb_metadata_storage import ( - MongoDbMetadataStorage, -) -from edge_orchestrator.infrastructure.model_forward.tf_serving_wrapper import ( - TFServingWrapper, -) -from edge_orchestrator.infrastructure.station_config.json_station_config import ( - JsonStationConfig, -) -from edge_orchestrator.infrastructure.telemetry_sink.postgresql_telemetry_sink import ( - PostgresTelemetrySink, -) -from tests.conftest import ( - TEST_DATA_FOLDER_PATH, - TEST_INVENTORY_PATH, - TEST_STATION_CONFIGS_FOLDER_PATH, -) - - -class Test(Config): - ROOT_PATH = Path("/tests") - MONGO_DB_URI = os.environ.get("MONGO_DB_URI", "mongodb://edge_db:27017/") - POSTGRES_DB_URI = os.environ.get( - "POSTGRES_DB_URI", "postgresql://vio:vio@hub_monitoring_db:5432/vio" - ) - SERVING_MODEL_URL = os.environ.get( - "SERVING_MODEL_URL", "http://edge_model_serving:8501" - ) - - def __init__(self): - self.metadata_storage = MongoDbMetadataStorage(self.MONGO_DB_URI) - self.binary_storage = FileSystemBinaryStorage(TEST_DATA_FOLDER_PATH / "storage") - self.inventory = JsonInventory(TEST_INVENTORY_PATH) - self.station_config = JsonStationConfig( - TEST_STATION_CONFIGS_FOLDER_PATH, self.inventory, TEST_DATA_FOLDER_PATH - ) - self.edge_station = EdgeStation(self.station_config) - self.model_forward = TFServingWrapper( - self.SERVING_MODEL_URL, self.inventory, self.station_config - ) - self.telemetry_sink = PostgresTelemetrySink(self.POSTGRES_DB_URI) diff --git a/edge_orchestrator/edge_orchestrator/environment/upload_with_gcp_bucket.py b/edge_orchestrator/edge_orchestrator/environment/upload_with_gcp_bucket.py deleted file mode 100644 index 9dbc9760..00000000 --- a/edge_orchestrator/edge_orchestrator/environment/upload_with_gcp_bucket.py +++ /dev/null @@ -1,41 +0,0 @@ -import os - -from edge_orchestrator.domain.models.edge_station import EdgeStation -from edge_orchestrator.environment.config import Config -from edge_orchestrator.infrastructure.binary_storage.gcp_binary_storage import ( - GCPBinaryStorage, -) -from edge_orchestrator.infrastructure.inventory.json_inventory import JsonInventory -from edge_orchestrator.infrastructure.metadata_storage.gcp_metadata_storage import ( - GCPMetadataStorage, -) -from edge_orchestrator.infrastructure.model_forward.tf_serving_wrapper import ( - TFServingWrapper, -) -from edge_orchestrator.infrastructure.station_config.json_station_config import ( - JsonStationConfig, -) -from edge_orchestrator.infrastructure.telemetry_sink.fake_telemetry_sink import ( - FakeTelemetrySink, -) - - -class UploadWithGCPBucket(Config): - SERVING_MODEL_URL = os.environ.get( - "SERVING_MODEL_URL", "http://edge_model_serving:8501" - ) - - def __init__(self): - self.metadata_storage = GCPMetadataStorage() - self.binary_storage = GCPBinaryStorage() - self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json") - self.station_config = JsonStationConfig( - self.ROOT_PATH / "config" / "station_configs", - self.inventory, - self.ROOT_PATH / "data", - ) - self.edge_station = EdgeStation(self.station_config) - self.model_forward = TFServingWrapper( - self.SERVING_MODEL_URL, self.inventory, self.station_config - ) - self.telemetry_sink = FakeTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/azure_container_binary_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/azure_container_binary_storage.py index d50105e6..83219dd0 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/azure_container_binary_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/azure_container_binary_storage.py @@ -1,5 +1,5 @@ import os -from typing import List +from typing import List, Optional from azure.core.exceptions import ResourceExistsError from azure.storage.blob import BlobServiceClient @@ -10,12 +10,23 @@ class AzureContainerBinaryStorage(BinaryStorage): - def __init__(self): - self.azure_container_name = os.getenv("AZURE_CONTAINER_NAME") - az_storage_connection_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING") + def __init__( + self, + azure_container_name: Optional[str] = None, + azure_storage_connection_string: Optional[str] = None, + ): + if azure_container_name is None: + self.azure_container_name = os.getenv( + "AZURE_CONTAINER_NAME", "blob-storage-raspberry-1" + ) + if azure_storage_connection_string is None: + azure_storage_connection_string = os.getenv( + "AZURE_STORAGE_CONNECTION_STRING" + ) self._blob_service_client = BlobServiceClient.from_connection_string( - az_storage_connection_str + azure_storage_connection_string ) + try: self._blob_service_client.create_container(self.azure_container_name) except ResourceExistsError: diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/binary_storage_factory.py b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/binary_storage_factory.py new file mode 100644 index 00000000..cf90d5a1 --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/binary_storage_factory.py @@ -0,0 +1,43 @@ +from functools import lru_cache +from typing import Optional, Dict, Type, Any + +from edge_orchestrator.domain.ports.binary_storage import BinaryStorage +from edge_orchestrator.infrastructure.binary_storage.azure_container_binary_storage import ( + AzureContainerBinaryStorage, +) +from edge_orchestrator.infrastructure.binary_storage.filesystem_binary_storage import ( + FileSystemBinaryStorage, +) +from edge_orchestrator.infrastructure.binary_storage.gcp_bucket_binary_storage import ( + GCPBucketBinaryStorage, +) +from edge_orchestrator.infrastructure.binary_storage.in_memory_binary_storage import ( + InMemoryBinaryStorage, +) +from edge_orchestrator.infrastructure.filesystem_helpers import get_tmp_path + +AVAILABLE_BINARY_STORAGES: Dict[str, Type[BinaryStorage]] = { + "azure_container": AzureContainerBinaryStorage, + "filesystem": FileSystemBinaryStorage, + "gcp_bucket": GCPBucketBinaryStorage, + "in_memory": InMemoryBinaryStorage, +} + + +class BinaryStorageFactory: + @staticmethod + @lru_cache() + def get_binary_storage( + binary_storage_type: Optional[str] = "filesystem", + **binary_storage_config: Optional[Dict[str, Any]], + ) -> BinaryStorage: + if not binary_storage_config: + binary_storage_config["src_directory"] = get_tmp_path() + try: + return AVAILABLE_BINARY_STORAGES[binary_storage_type]( + **binary_storage_config + ) + except KeyError as err: + raise ValueError( + f"Unknown binary storage type: {binary_storage_type}" + ) from err diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/filesystem_binary_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/filesystem_binary_storage.py index 47e335c1..0b0c02bf 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/filesystem_binary_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/filesystem_binary_storage.py @@ -1,13 +1,17 @@ from pathlib import Path -from typing import List +from typing import List, Optional from edge_orchestrator.domain.models.item import Item from edge_orchestrator.domain.ports.binary_storage import BinaryStorage +from edge_orchestrator.infrastructure.filesystem_helpers import get_tmp_path class FileSystemBinaryStorage(BinaryStorage): - def __init__(self, src_directory_path: Path): - self.folder = src_directory_path + def __init__(self, src_directory: Optional[str] = None): + if src_directory is None: + self.folder = get_tmp_path() + else: + self.folder = Path(src_directory) def save_item_binaries(self, item: Item): path = self.folder / item.id diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/gcp_binary_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/gcp_bucket_binary_storage.py similarity index 72% rename from edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/gcp_binary_storage.py rename to edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/gcp_bucket_binary_storage.py index 23824f01..768ad715 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/gcp_binary_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/gcp_bucket_binary_storage.py @@ -1,5 +1,6 @@ import os -from typing import List +import secrets +from typing import List, Optional from google.cloud import storage @@ -7,11 +8,16 @@ from edge_orchestrator.domain.ports.binary_storage import BinaryStorage -class GCPBinaryStorage(BinaryStorage): - def __init__(self): +class GCPBucketBinaryStorage(BinaryStorage): + def __init__(self, prefix: Optional[str] = None, bucket_name: Optional[str] = None): + if prefix is None: + prefix = os.environ.get("EDGE_NAME", f"edge#{secrets.token_hex(4)}") + if bucket_name is None: + bucket_name = os.getenv("GCP_BUCKET_NAME") + + self.prefix = prefix self.storage_client = storage.Client() - self.prefix = os.environ.get("EDGE_NAME", "") - self.bucket = self.storage_client.get_bucket(os.getenv("GCP_BUCKET_NAME")) + self.bucket = self.storage_client.get_bucket(bucket_name) def save_item_binaries(self, item: Item) -> None: for camera_id, binary in item.binaries.items(): diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/memory_binary_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/in_memory_binary_storage.py similarity index 77% rename from edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/memory_binary_storage.py rename to edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/in_memory_binary_storage.py index 0d7d653d..bec59ac5 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/memory_binary_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/in_memory_binary_storage.py @@ -1,15 +1,15 @@ -from typing import List +from typing import Any, Dict, List from edge_orchestrator.domain.models.item import Item from edge_orchestrator.domain.ports.binary_storage import BinaryStorage -class MemoryBinaryStorage(BinaryStorage): +class InMemoryBinaryStorage(BinaryStorage): def __init__(self): - self.binaries = {} + self.binaries: Dict[str, Any] = {} def save_item_binaries(self, item: Item) -> None: - binaries_dict = {} + binaries_dict: Dict[str, bytes] = {} for camera_id, binary in item.binaries.items(): binaries_dict[camera_id] = binary self.binaries[item.id] = binaries_dict diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/camera/fake_camera.py b/edge_orchestrator/edge_orchestrator/infrastructure/camera/fake_camera.py index 6cd0f58b..643d5f4c 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/camera/fake_camera.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/camera/fake_camera.py @@ -4,14 +4,13 @@ from edge_orchestrator import logger from edge_orchestrator.domain.models.camera import Camera -from edge_orchestrator.environment.config import Config class FakeCamera(Camera): def __init__(self, id: str, settings: Dict[str, Union[str, Dict]]): self.id: str = id self.settings: Dict = settings - self.data_folder_path: Path = Config.ROOT_PATH / "data" + self.data_folder_path: Path = Path(__file__).parents[2] / "data" self.image_extensions: List = ["*.jpg", "*.png"] def capture(self) -> bytes: diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/filesystem_helpers.py b/edge_orchestrator/edge_orchestrator/infrastructure/filesystem_helpers.py new file mode 100644 index 00000000..b898a104 --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/infrastructure/filesystem_helpers.py @@ -0,0 +1,7 @@ +from pathlib import Path + + +def get_tmp_path() -> Path: + tmp_path = Path("/tmp/vio/edge_orchestrator/data/storage") + tmp_path.mkdir(parents=True, exist_ok=True) + return tmp_path diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/inventory/__init__.py b/edge_orchestrator/edge_orchestrator/infrastructure/inventory/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/inventory/json_inventory.py b/edge_orchestrator/edge_orchestrator/infrastructure/inventory/json_inventory.py deleted file mode 100644 index 1f1435e9..00000000 --- a/edge_orchestrator/edge_orchestrator/infrastructure/inventory/json_inventory.py +++ /dev/null @@ -1,30 +0,0 @@ -import json -from pathlib import Path -from typing import Dict, List, Union - -from edge_orchestrator.domain.ports.inventory import Inventory - - -class JsonInventory(Inventory): - def __init__(self, inventory_path: Path): - if not inventory_path.exists(): - raise FileNotFoundError(f'No inventory file found at "{inventory_path}"') - - with open(inventory_path, "r") as inventory_file: - content = json.load(inventory_file) - self.cameras = content["cameras"] - self.models = content["models"] - self.camera_rules = content["camera_rules"] - self.item_rules = content["item_rules"] - - def get_cameras(self) -> List[str]: - return self.inventory["cameras"] - - def get_models(self) -> Dict[str, Dict[str, Union[str, int]]]: - return self.inventory["models"] - - def get_camera_rules(self) -> List[str]: - return self.inventory["camera_rules"] - - def get_item_rules(self) -> List[str]: - return self.inventory["item_rules"] diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/azure_container_metadata_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/azure_container_metadata_storage.py index a755e947..012a31f9 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/azure_container_metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/azure_container_metadata_storage.py @@ -1,6 +1,6 @@ import json import os -from typing import Dict, List +from typing import Dict, List, Optional from azure.core.exceptions import ResourceExistsError from azure.storage.blob import BlobServiceClient @@ -11,12 +11,23 @@ class AzureContainerMetadataStorage(MetadataStorage): - def __init__(self): - self.azure_container_name = os.getenv("AZURE_CONTAINER_NAME") - az_storage_connection_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING") + def __init__( + self, + azure_container_name: Optional[str] = None, + azure_storage_connection_string: Optional[str] = None, + ): + if azure_container_name is None: + self.azure_container_name = os.getenv( + "AZURE_CONTAINER_NAME", "blob-storage-raspberry-1" + ) + if azure_storage_connection_string is None: + azure_storage_connection_string = os.getenv( + "AZURE_STORAGE_CONNECTION_STRING" + ) self._blob_service_client = BlobServiceClient.from_connection_string( - az_storage_connection_str + azure_storage_connection_string ) + try: self._blob_service_client.create_container(self.azure_container_name) except ResourceExistsError: diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/filesystem_metadata_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/filesystem_metadata_storage.py index f052fd5e..1926371f 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/filesystem_metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/filesystem_metadata_storage.py @@ -1,14 +1,18 @@ import json from pathlib import Path -from typing import Dict, List +from typing import Dict, List, Optional from edge_orchestrator.domain.models.item import Item from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage +from edge_orchestrator.infrastructure.filesystem_helpers import get_tmp_path class FileSystemMetadataStorage(MetadataStorage): - def __init__(self, src_directory_path: Path): - self.folder = src_directory_path + def __init__(self, src_directory: Optional[str] = None): + if src_directory is None: + self.folder = get_tmp_path() + else: + self.folder = Path(src_directory) def save_item_metadata(self, item: Item): (self.folder / item.id).mkdir(parents=True, exist_ok=True) diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/gcp_metadata_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/gcp_bucket_metadata_storage.py similarity index 73% rename from edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/gcp_metadata_storage.py rename to edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/gcp_bucket_metadata_storage.py index aaa80559..6d76aabb 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/gcp_metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/gcp_bucket_metadata_storage.py @@ -1,6 +1,7 @@ import json import os -from typing import Dict, List +import secrets +from typing import Dict, List, Optional from google.cloud import storage @@ -8,11 +9,16 @@ from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage -class GCPMetadataStorage(MetadataStorage): - def __init__(self): +class GCPBucketMetadataStorage(MetadataStorage): + def __init__(self, prefix: Optional[str] = None, bucket_name: Optional[str] = None): + if prefix is None: + prefix = os.environ.get("EDGE_NAME", f"edge#{secrets.token_hex(4)}") + if bucket_name is None: + bucket_name = os.getenv("GCP_BUCKET_NAME") + + self.prefix = prefix self.storage_client = storage.Client() - self.prefix = os.environ.get("EDGE_NAME", "") - self.bucket = self.storage_client.get_bucket(os.getenv("GCP_BUCKET_NAME")) + self.bucket = self.storage_client.get_bucket(bucket_name) def save_item_metadata(self, item: Item): item_metadata = json.dumps(item.get_metadata()) diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/memory_metadata_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/in_memory_metadata_storage.py similarity index 94% rename from edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/memory_metadata_storage.py rename to edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/in_memory_metadata_storage.py index 2955a5d2..647edb68 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/memory_metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/in_memory_metadata_storage.py @@ -4,7 +4,7 @@ from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage -class MemoryMetadataStorage(MetadataStorage): +class InMemoryMetadataStorage(MetadataStorage): def __init__(self): self.items_metadata = {} diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/metadata_storage_factory.py b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/metadata_storage_factory.py new file mode 100644 index 00000000..c2fae268 --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/metadata_storage_factory.py @@ -0,0 +1,48 @@ +from functools import lru_cache +from typing import Dict, Type, Optional, Any + +from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage +from edge_orchestrator.infrastructure.metadata_storage.azure_container_metadata_storage import ( + AzureContainerMetadataStorage, +) +from edge_orchestrator.infrastructure.metadata_storage.filesystem_metadata_storage import ( + FileSystemMetadataStorage, +) +from edge_orchestrator.infrastructure.metadata_storage.gcp_bucket_metadata_storage import ( + GCPBucketMetadataStorage, +) +from edge_orchestrator.infrastructure.metadata_storage.in_memory_metadata_storage import ( + InMemoryMetadataStorage, +) +from edge_orchestrator.infrastructure.metadata_storage.mongo_db_metadata_storage import ( + MongoDbMetadataStorage, +) +from infrastructure.filesystem_helpers import get_tmp_path + +AVAILABLE_METADATA_STORAGES: Dict[str, Type[MetadataStorage]] = { + "azure_container": AzureContainerMetadataStorage, + "filesystem": FileSystemMetadataStorage, + "gcp_bucket": GCPBucketMetadataStorage, + "in_memory": InMemoryMetadataStorage, + "mongo_db": MongoDbMetadataStorage, +} + + +class MetadataStorageFactory: + @staticmethod + @lru_cache() + def get_metadata_storage( + metadata_storage_type: Optional[str] = "filesystem", + **metadata_storage_config: Optional[Dict[str, Any]], + ) -> MetadataStorage: + if not metadata_storage_type: + metadata_storage_type["src_directory"] = get_tmp_path() + try: + # return AVAILABLE_METADATA_STORAGES[metadata_storage_type]() + return AVAILABLE_METADATA_STORAGES[metadata_storage_type]( + **metadata_storage_config + ) + except KeyError as err: + raise ValueError( + f"Unknown metadata storage type: {metadata_storage_type}" + ) from err diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/mongodb_metadata_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/mongo_db_metadata_storage.py similarity index 82% rename from edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/mongodb_metadata_storage.py rename to edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/mongo_db_metadata_storage.py index 28e27a21..e2f79e9b 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/mongodb_metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/mongo_db_metadata_storage.py @@ -1,4 +1,5 @@ -from typing import Dict, List +import os +from typing import Dict, List, Optional import pymongo @@ -7,7 +8,10 @@ class MongoDbMetadataStorage(MetadataStorage): - def __init__(self, mongodb_uri: str): + def __init__(self, mongodb_uri: Optional[str] = None): + if mongodb_uri is None: + mongodb_uri = os.environ.get("MONGO_DB_URI", "mongodb://edge_db:27017/") + self.client = pymongo.MongoClient(mongodb_uri) self.db = self.client["orchestratorDB"] self.items_metadata = self.db["items"] diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/model_forward_factory.py b/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/model_forward_factory.py new file mode 100644 index 00000000..b4a54b6c --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/model_forward_factory.py @@ -0,0 +1,30 @@ +from functools import lru_cache +from typing import Dict, Type, Optional, Any + +from edge_orchestrator.domain.ports.model_forward import ModelForward +from edge_orchestrator.infrastructure.model_forward.fake_model_forward import ( + FakeModelForward, +) +from edge_orchestrator.infrastructure.model_forward.tf_serving_wrapper import ( + TFServingWrapper, +) + +AVAILABLE_MODEL_FORWARD: Dict[str, Type[ModelForward]] = { + "fake": FakeModelForward, + "tf_serving": TFServingWrapper, +} + + +class ModelForwardFactory: + @staticmethod + @lru_cache() + def get_model_forward( + model_forward_type: Optional[str] = "fake", + **model_forward_params: Optional[Dict[str, Any]], + ) -> ModelForward: + try: + return AVAILABLE_MODEL_FORWARD[model_forward_type](**model_forward_params) + except KeyError as err: + raise ValueError( + f"Unknown model forward type: {model_forward_type}" + ) from err diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/tf_serving_wrapper.py b/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/tf_serving_wrapper.py index 962cbdc6..9317bd34 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/tf_serving_wrapper.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/tf_serving_wrapper.py @@ -1,3 +1,6 @@ +import os +from typing import Optional + from edge_orchestrator import logger from edge_orchestrator.domain.models.model_infos import ModelInfos, ModelTypes from edge_orchestrator.domain.ports.model_forward import ModelForward @@ -13,10 +16,13 @@ class TFServingWrapper(ModelForward): - def __init__(self, serving_model_url, inventory, station_config): + def __init__(self, serving_model_url: Optional[str] = None): + if serving_model_url is None: + serving_model_url = os.environ.get( + "SERVING_MODEL_URL", "http://edge_model_serving:8501" + ) + self.serving_model_url = serving_model_url - self.inventory = inventory - self.station_config = station_config async def perform_inference( self, model: ModelInfos, binary_data: bytes, binary_name: str diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/station_config/json_station_config.py b/edge_orchestrator/edge_orchestrator/infrastructure/station_config/json_station_config.py index b0201cd8..44e3c55b 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/station_config/json_station_config.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/station_config/json_station_config.py @@ -3,23 +3,26 @@ from pathlib import Path from typing import Dict, List, Type, Union +from application.dto.station_config import StationConfig +from application.no_active_configuration_exception import NoActiveConfigurationException from edge_orchestrator import logger from edge_orchestrator.domain.models.camera import Camera from edge_orchestrator.domain.models.model_infos import ModelInfos -from edge_orchestrator.domain.ports.inventory import Inventory -from edge_orchestrator.domain.ports.station_config import StationConfig from edge_orchestrator.infrastructure.camera.fake_camera import FakeCamera from edge_orchestrator.infrastructure.camera.raspberry_pi_camera import ( RaspberryPiCamera, ) from edge_orchestrator.infrastructure.camera.usb_camera import UsbCamera +AVAILABLE_CAMERA_TYPES = { + "fake": FakeCamera, + "pi_camera": RaspberryPiCamera, + "usb_camera": UsbCamera, +} + class JsonStationConfig(StationConfig): - def __init__( - self, station_configs_folder: Path, inventory: Inventory, data_folder: Path - ): - self.inventory = inventory + def __init__(self, station_configs_folder: Path, data_folder: Path): self.data_folder = data_folder if not station_configs_folder.exists(): @@ -29,14 +32,14 @@ def __init__( self.station_configs_folder = station_configs_folder self.all_configs = {} - self.load() + self._load() self.active_config = None - config_name = os.environ.get("ACTIVE_CONFIG_NAME", None) + config_name = os.environ.get("ACTIVE_CONFIG_NAME") if config_name is not None: self.set_station_config(config_name) - def load(self): + def _load(self): self.all_configs = {} for config in self.station_configs_folder.glob("*.json"): with open(config, "r") as station_config_file: @@ -46,13 +49,16 @@ def load(self): def set_station_config(self, config_name: str): try: + self.active_config = self.all_configs[config_name] self.active_config_name = config_name - self.active_config = self.all_configs[self.active_config_name] - logger.info(f"Activated the configuration {self.active_config_name}") except KeyError: - raise KeyError( - f"{config_name} is unknown. Valid configs are {list(self.all_configs.keys())}" + err_msg = ( + f"config_name '{config_name}' is unknown. " + f"Valid configs are: {list(self.all_configs.keys())}" ) + logger.error(err_msg) + raise NoActiveConfigurationException(err_msg) + logger.info(f"Activated the configuration {self.active_config_name}") def get_model_pipeline_for_camera(self, camera_id: str) -> List[ModelInfos]: model_pipeline = [] @@ -62,7 +68,7 @@ def get_model_pipeline_for_camera(self, camera_id: str) -> List[ModelInfos]: if model_pipeline_config: for model_id, model in model_pipeline_config.items(): model_infos = ModelInfos.from_model_graph_node( - camera_id, model_id, model, self.inventory, self.data_folder + camera_id, model_id, model, self.data_folder ) model_pipeline.append(model_infos) else: @@ -74,12 +80,9 @@ def get_cameras(self) -> List[str]: def get_camera_type(self, camera_id: str) -> Type[Camera]: camera_config = self.active_config["cameras"].get(camera_id) - if camera_config["type"] == "fake": - return FakeCamera - elif camera_config["type"] == "pi_camera": - return RaspberryPiCamera - elif camera_config["type"] == "usb_camera": - return UsbCamera + camera_type = AVAILABLE_CAMERA_TYPES.get(camera_config["type"]) + if camera_type is not None: + return camera_type else: raise ValueError(f"Camera type ({camera_config['type']}) is not supported.") diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/postgresql_telemetry_sink.py b/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/postgres_telemetry_sink.py similarity index 92% rename from edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/postgresql_telemetry_sink.py rename to edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/postgres_telemetry_sink.py index 6f3b8c94..5582980e 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/postgresql_telemetry_sink.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/postgres_telemetry_sink.py @@ -1,7 +1,7 @@ import time from datetime import datetime from random import randrange -from typing import Dict +from typing import Dict, Optional from urllib.parse import urlparse from uuid import uuid4 @@ -12,7 +12,14 @@ class PostgresTelemetrySink(TelemetrySink): - def __init__(self, connection_url: str, timeout: int = 30, interval: int = 2): + def __init__( + self, + connection_url: Optional[ + str + ] = "postgresql://vio:vio@hub_monitoring_db:5432/vio", + timeout: int = 30, + interval: int = 2, + ): self.connection_url = connection_url self._connection = None self._timeout = timeout diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/telemetry_sink_factory.py b/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/telemetry_sink_factory.py new file mode 100644 index 00000000..59b1d53f --- /dev/null +++ b/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/telemetry_sink_factory.py @@ -0,0 +1,36 @@ +from functools import lru_cache +from typing import Dict, Type, Optional, Any + +from edge_orchestrator.domain.ports.telemetry_sink import TelemetrySink +from edge_orchestrator.infrastructure.telemetry_sink.azure_iot_hub_telemetry_sink import ( + AzureIotHubTelemetrySink, +) +from edge_orchestrator.infrastructure.telemetry_sink.fake_telemetry_sink import ( + FakeTelemetrySink, +) +from edge_orchestrator.infrastructure.telemetry_sink.postgres_telemetry_sink import ( + PostgresTelemetrySink, +) + +AVAILABLE_TELEMETRY_SINK: Dict[str, Type[TelemetrySink]] = { + "azure_iot_hub": AzureIotHubTelemetrySink, + "fake": FakeTelemetrySink, + "postgres": PostgresTelemetrySink, +} + + +class TelemetrySinkFactory: + @staticmethod + @lru_cache() + def get_telemetry_sink( + telemetry_sink_type: Optional[str] = "fake", + **telemetry_sink_config: Optional[Dict[str, Any]], + ) -> TelemetrySink: + try: + return AVAILABLE_TELEMETRY_SINK[telemetry_sink_type]( + **telemetry_sink_config + ) + except KeyError as err: + raise ValueError( + f"Unknown telemetry sink type: {telemetry_sink_type}" + ) from err diff --git a/edge_orchestrator/pyproject.toml b/edge_orchestrator/pyproject.toml index b0b819d2..c862915b 100644 --- a/edge_orchestrator/pyproject.toml +++ b/edge_orchestrator/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "Pillow==9.3.0", "psycopg2-binary==2.9.5", "psycopg2==2.9.3", + "pydantic==2.1.1", "pymongo==4.3.3", "smart_open[azure]==6.3.0", "google-cloud-storage==2.2.1", diff --git a/edge_orchestrator/tests/conftest.py b/edge_orchestrator/tests/conftest.py index b57de306..580e24a6 100644 --- a/edge_orchestrator/tests/conftest.py +++ b/edge_orchestrator/tests/conftest.py @@ -9,15 +9,15 @@ TEST_STATION_CONFIG_2_PATH = ( TEST_STATION_CONFIGS_FOLDER_PATH / "station_config_TEST2.json" ) -TEST_INVENTORY_PATH = TEST_CONFIG_FOLDER_PATH / "inventory_TEST.json" ROOT_REPOSITORY_PATH = Path(__file__).parents[2] pytest_plugins = [ "tests.fixtures.binaries", "tests.fixtures.cameras_metadata", + "tests.fixtures.containers", "tests.fixtures.items", "tests.fixtures.metadata", - "tests.fixtures.containers", + "tests.fixtures.supervisor_and_collaborators", ] EDGE_DB_IMG = "mongo:5.0.2" diff --git a/edge_orchestrator/tests/fixtures/supervisor_and_collaborators.py b/edge_orchestrator/tests/fixtures/supervisor_and_collaborators.py new file mode 100644 index 00000000..f063f441 --- /dev/null +++ b/edge_orchestrator/tests/fixtures/supervisor_and_collaborators.py @@ -0,0 +1,89 @@ +import os +from pathlib import Path + +from _pytest.fixtures import fixture + +from edge_orchestrator.domain.models.edge_station import EdgeStation +from edge_orchestrator.domain.use_cases.supervisor import Supervisor +from edge_orchestrator.infrastructure.binary_storage.filesystem_binary_storage import ( + FileSystemBinaryStorage, +) +from edge_orchestrator.infrastructure.metadata_storage.mongo_db_metadata_storage import ( + MongoDbMetadataStorage, +) +from edge_orchestrator.infrastructure.model_forward.tf_serving_wrapper import ( + TFServingWrapper, +) +from edge_orchestrator.infrastructure.station_config.json_station_config import ( + JsonStationConfig, +) +from edge_orchestrator.infrastructure.telemetry_sink.postgres_telemetry_sink import ( + PostgresTelemetrySink, +) +from tests.conftest import ( + TEST_DATA_FOLDER_PATH, + TEST_STATION_CONFIGS_FOLDER_PATH, +) + +ROOT_PATH = Path("/tests") +MONGO_DB_URI = os.environ.get("MONGO_DB_URI", "mongodb://edge_db:27017/") +POSTGRES_DB_URI = os.environ.get( + "POSTGRES_DB_URI", "postgresql://vio:vio@hub_monitoring_db:5432/vio" +) +SERVING_MODEL_URL = os.environ.get( + "SERVING_MODEL_URL", "http://edge_model_serving:8501" +) + + +@fixture(scope="function") +def mongodb_metadata_storage(): + return MongoDbMetadataStorage(MONGO_DB_URI) + + +@fixture(scope="function") +def filesystem_binary_storage(): + return FileSystemBinaryStorage(TEST_DATA_FOLDER_PATH / "storage") + + +@fixture(scope="function") +def json_station_config(json_inventory): + return JsonStationConfig(TEST_STATION_CONFIGS_FOLDER_PATH, TEST_DATA_FOLDER_PATH) + + +@fixture(scope="function") +def edge_station(json_station_config): + return EdgeStation(json_station_config) + + +@fixture(scope="function") +def tf_model_forward(): + return TFServingWrapper(SERVING_MODEL_URL) + + +@fixture(scope="function") +def postgres_telemetry_sink(): + return PostgresTelemetrySink(POSTGRES_DB_URI) + + +@fixture(scope="function") +def postgres_telemetry_sink(): + return PostgresTelemetrySink(POSTGRES_DB_URI) + + +@fixture(scope="function") +def supervisor( + filesystem_binary_storage, + edge_station, + mongodb_metadata_storage, + tf_model_forward, + json_station_config, + postgres_telemetry_sink, +): + return Supervisor( + filesystem_binary_storage, + edge_station, + mongodb_metadata_storage, + tf_model_forward, + json_station_config, + postgres_telemetry_sink, + ) diff --git a/edge_orchestrator/tests/integration_tests/infrastructure/station_config/test_json_station_config.py b/edge_orchestrator/tests/integration_tests/infrastructure/station_config/test_json_station_config.py index b2e42e33..69bc1ac0 100644 --- a/edge_orchestrator/tests/integration_tests/infrastructure/station_config/test_json_station_config.py +++ b/edge_orchestrator/tests/integration_tests/infrastructure/station_config/test_json_station_config.py @@ -5,13 +5,11 @@ from freezegun import freeze_time from edge_orchestrator.domain.models.model_infos import ModelInfos -from edge_orchestrator.infrastructure.inventory.json_inventory import JsonInventory from edge_orchestrator.infrastructure.station_config.json_station_config import ( JsonStationConfig, ) from tests.conftest import ( TEST_DATA_FOLDER_PATH, - TEST_INVENTORY_PATH, TEST_STATION_CONFIGS_FOLDER_PATH, ) @@ -24,9 +22,8 @@ def test_get_models_for_camera_should_return_one_model_infos_when_camera_config_ self, ): # Given - inventory = JsonInventory(TEST_INVENTORY_PATH) json_station_config = JsonStationConfig( - TEST_STATION_CONFIGS_FOLDER_PATH, inventory, TEST_DATA_FOLDER_PATH + TEST_STATION_CONFIGS_FOLDER_PATH, TEST_DATA_FOLDER_PATH ) camera_id = "camera_id3" @@ -40,9 +37,8 @@ def test_get_models_for_camera_should_return_two_model_infos_when_camera_config_ self, ): # Given - inventory = JsonInventory(TEST_INVENTORY_PATH) json_station_config = JsonStationConfig( - TEST_STATION_CONFIGS_FOLDER_PATH, inventory, TEST_DATA_FOLDER_PATH + TEST_STATION_CONFIGS_FOLDER_PATH, TEST_DATA_FOLDER_PATH ) json_station_config.set_station_config("station_config_TEST2") camera_id = "camera_id3" diff --git a/edge_orchestrator/tests/integration_tests/infrastructure/telemetry_sink/test_postgresql_telemetry_sink.py b/edge_orchestrator/tests/integration_tests/infrastructure/telemetry_sink/test_postgresql_telemetry_sink.py index 57bb0e17..f236ab6c 100644 --- a/edge_orchestrator/tests/integration_tests/infrastructure/telemetry_sink/test_postgresql_telemetry_sink.py +++ b/edge_orchestrator/tests/integration_tests/infrastructure/telemetry_sink/test_postgresql_telemetry_sink.py @@ -4,7 +4,7 @@ import pytest from freezegun import freeze_time -from edge_orchestrator.infrastructure.telemetry_sink.postgresql_telemetry_sink import ( +from edge_orchestrator.infrastructure.telemetry_sink.postgres_telemetry_sink import ( PostgresTelemetrySink, ) diff --git a/edge_orchestrator/tests/unit_tests/domain/models/test_edge_station.py b/edge_orchestrator/tests/unit_tests/domain/models/test_edge_station.py index 7bac3b1c..bab7f085 100644 --- a/edge_orchestrator/tests/unit_tests/domain/models/test_edge_station.py +++ b/edge_orchestrator/tests/unit_tests/domain/models/test_edge_station.py @@ -4,9 +4,9 @@ import pytest from freezegun import freeze_time +from application.dto.station_config import StationConfig from edge_orchestrator.api_config import get_station_config from edge_orchestrator.domain.models.edge_station import EdgeStation -from edge_orchestrator.domain.ports.station_config import StationConfig from edge_orchestrator.infrastructure.camera.fake_camera import FakeCamera @@ -21,7 +21,7 @@ def test_register_cameras_raises_exception_when_no_active_configuration_is_set( # Then with pytest.raises(TypeError) as error: - edge_station.register_cameras(station_config) + edge_station.register_cameras() assert str(error.value) == "'NoneType' object is not subscriptable" def test_capture_should_raise_exception_when_cameras_are_not_registered(self): @@ -59,7 +59,7 @@ def test_capture_should_instantiate_item_with_1_binary( edge_station = EdgeStation(station_config) # When - edge_station.register_cameras(station_config) + edge_station.register_cameras() cameras_metadata, binaries = edge_station.capture() # Then diff --git a/edge_orchestrator/tests/unit_tests/domain/test_supervisor.py b/edge_orchestrator/tests/unit_tests/domain/test_supervisor.py index 9924c904..463848f4 100644 --- a/edge_orchestrator/tests/unit_tests/domain/test_supervisor.py +++ b/edge_orchestrator/tests/unit_tests/domain/test_supervisor.py @@ -5,15 +5,15 @@ import numpy as np import pytest +from edge_orchestrator.domain.models.edge_station import EdgeStation from edge_orchestrator.domain.models.item import Item from edge_orchestrator.domain.models.model_infos import ModelInfos -from edge_orchestrator.domain.use_cases.supervisor import Supervisor, crop_image -from edge_orchestrator.infrastructure.binary_storage.memory_binary_storage import ( - MemoryBinaryStorage, +from edge_orchestrator.domain.use_cases.supervisor import crop_image, Supervisor +from edge_orchestrator.infrastructure.binary_storage.in_memory_binary_storage import ( + InMemoryBinaryStorage, ) -from edge_orchestrator.infrastructure.inventory.json_inventory import JsonInventory -from edge_orchestrator.infrastructure.metadata_storage.memory_metadata_storage import ( - MemoryMetadataStorage, +from edge_orchestrator.infrastructure.metadata_storage.in_memory_metadata_storage import ( + InMemoryMetadataStorage, ) from edge_orchestrator.infrastructure.model_forward.fake_model_forward import ( FakeModelForward, @@ -24,24 +24,21 @@ from edge_orchestrator.infrastructure.telemetry_sink.fake_telemetry_sink import ( FakeTelemetrySink, ) -from edge_orchestrator.infrastructure.telemetry_sink.postgresql_telemetry_sink import ( +from edge_orchestrator.infrastructure.telemetry_sink.postgres_telemetry_sink import ( PostgresTelemetrySink, ) from tests.conftest import ( TEST_DATA_FOLDER_PATH, - TEST_INVENTORY_PATH, TEST_STATION_CONFIGS_FOLDER_PATH, ) @pytest.mark.asyncio class TestSupervisor: - async def test_2_models_in_parallel(self, my_item_1): + async def test_2_models_in_parallel(self, supervisor): random.seed(42) np.random.seed(42) - inventory = JsonInventory(TEST_INVENTORY_PATH) - models_graph = { "model_1": {"name": "inception", "depends_on": []}, "model_2": {"name": "inception", "depends_on": []}, @@ -49,13 +46,13 @@ async def test_2_models_in_parallel(self, my_item_1): model_pipeline = [ ModelInfos.from_model_graph_node( - "camera_id", model_id, model, inventory, TEST_DATA_FOLDER_PATH + "camera_id", model_id, model, TEST_DATA_FOLDER_PATH ) for model_id, model in models_graph.items() ] binary_data = b"fhfh" - supervisor = Supervisor(model_forward=FakeModelForward()) + supervisor.model_forward = FakeModelForward() inference_output = await supervisor.get_inference( {}, model_pipeline, binary_data, image_name="full_image" ) @@ -71,12 +68,10 @@ async def test_2_models_in_parallel(self, my_item_1): assert inference_output == inference_output_expected - async def test_2_models_in_serie(self): + async def test_2_models_in_serie(self, supervisor): random.seed(42) np.random.seed(42) - inventory = JsonInventory(TEST_INVENTORY_PATH) - models_graph = { "model_1": {"name": "inception", "depends_on": ["model_2"]}, "model_2": {"name": "mobilenet_v1_640x640", "depends_on": []}, @@ -84,7 +79,7 @@ async def test_2_models_in_serie(self): model_pipeline = [ ModelInfos.from_model_graph_node( - "camera_id", model_id, model, inventory, TEST_DATA_FOLDER_PATH + "camera_id", model_id, model, TEST_DATA_FOLDER_PATH ) for model_id, model in models_graph.items() ] @@ -92,7 +87,7 @@ async def test_2_models_in_serie(self): (TEST_DATA_FOLDER_PATH / "fake_item" / "image1.jpg").open("rb").read() ) - supervisor = Supervisor(model_forward=FakeModelForward()) + supervisor.model_forward = FakeModelForward() inference_output = await supervisor.get_inference( {}, model_pipeline, binary_data, image_name="full_image" ) @@ -579,16 +574,17 @@ async def test_set_decision_should_send_final_decision_to_telemetry_sink( # Given item = Item(serial_number="", category="", cameras_metadata={}, binaries={}) item.id = "item_id" - inventory = JsonInventory(TEST_INVENTORY_PATH) station_config = JsonStationConfig( - TEST_STATION_CONFIGS_FOLDER_PATH, inventory, TEST_DATA_FOLDER_PATH + TEST_STATION_CONFIGS_FOLDER_PATH, TEST_DATA_FOLDER_PATH ) station_config.set_station_config("station_config_TEST") supervisor = Supervisor( - station_config=station_config, - metadata_storage=MemoryMetadataStorage(), + binary_storage=InMemoryBinaryStorage(), + edge_station=EdgeStation(station_config), + metadata_storage=InMemoryMetadataStorage(), model_forward=FakeModelForward(), - binary_storage=MemoryBinaryStorage(), + station_config=station_config, + telemetry_sink=FakeTelemetrySink(), ) # When @@ -622,16 +618,16 @@ async def test_inspect_should_log_information_about_item_processing( "Entering try Decision", "End of Decision", ] - inventory = JsonInventory(TEST_INVENTORY_PATH) station_config = JsonStationConfig( - TEST_STATION_CONFIGS_FOLDER_PATH, inventory, TEST_DATA_FOLDER_PATH + TEST_STATION_CONFIGS_FOLDER_PATH, TEST_DATA_FOLDER_PATH ) station_config.set_station_config("station_config_TEST") supervisor = Supervisor( - station_config=station_config, - metadata_storage=MemoryMetadataStorage(), + binary_storage=InMemoryBinaryStorage(), + edge_station=EdgeStation(station_config), + metadata_storage=InMemoryMetadataStorage(), model_forward=FakeModelForward(), - binary_storage=MemoryBinaryStorage(), + station_config=station_config, telemetry_sink=FakeTelemetrySink(), ) diff --git a/edge_orchestrator/tests/unit_tests/infrastructure/binary_storage/test_gcp_binary_storage.py b/edge_orchestrator/tests/unit_tests/infrastructure/binary_storage/test_gcp_binary_storage.py index ad81fd48..b5f8488b 100644 --- a/edge_orchestrator/tests/unit_tests/infrastructure/binary_storage/test_gcp_binary_storage.py +++ b/edge_orchestrator/tests/unit_tests/infrastructure/binary_storage/test_gcp_binary_storage.py @@ -1,14 +1,16 @@ from unittest.mock import Mock, patch from edge_orchestrator.domain.models.item import Item -from edge_orchestrator.infrastructure.binary_storage.gcp_binary_storage import ( - GCPBinaryStorage, +from edge_orchestrator.infrastructure.binary_storage.gcp_bucket_binary_storage import ( + GCPBucketBinaryStorage, ) from tests.conftest import TEST_DATA_FOLDER_PATH class TestGCPBinaryStorage: - @patch("edge_orchestrator.infrastructure.binary_storage.gcp_binary_storage.storage") + @patch( + "edge_orchestrator.infrastructure.binary_storage.gcp_bucket_binary_storage.storage" + ) def test_save_item_binaries_should_write_image_in_gcp(self, mock_storage): # Given test_camera_id = "1" @@ -19,7 +21,7 @@ def test_save_item_binaries_should_write_image_in_gcp(self, mock_storage): mock_gcs_client = mock_storage.Client.return_value mock_bucket = Mock() mock_gcs_client.get_bucket.return_value = mock_bucket - gcs = GCPBinaryStorage() + gcs = GCPBucketBinaryStorage() # When gcs.save_item_binaries(item) @@ -42,7 +44,7 @@ def test_get_item_binary_should_return_image(self, mock_storage): mock_gcs_client = mock_storage.Client.return_value mock_bucket = Mock() mock_gcs_client.get_bucket.return_value = mock_bucket - gcs = GCPBinaryStorage() + gcs = GCPBucketBinaryStorage() gcs.save_item_binaries(item) # When diff --git a/edge_orchestrator/tests/unit_tests/infrastructure/binary_storage/test_memory_binary_storage.py b/edge_orchestrator/tests/unit_tests/infrastructure/binary_storage/test_memory_binary_storage.py index 4606d2db..a320a86b 100644 --- a/edge_orchestrator/tests/unit_tests/infrastructure/binary_storage/test_memory_binary_storage.py +++ b/edge_orchestrator/tests/unit_tests/infrastructure/binary_storage/test_memory_binary_storage.py @@ -1,8 +1,8 @@ from unittest.mock import patch from edge_orchestrator.domain.models.item import Item -from edge_orchestrator.infrastructure.binary_storage.memory_binary_storage import ( - MemoryBinaryStorage, +from edge_orchestrator.infrastructure.binary_storage.in_memory_binary_storage import ( + InMemoryBinaryStorage, ) @@ -11,7 +11,7 @@ class TestMemoryBinaryStorage: def test_save_item_binaries_should_write_image_in_memory(self, generate_id_mocked): # Given generate_id_mocked.return_value = "my_item_id" - binary_storage = MemoryBinaryStorage() + binary_storage = InMemoryBinaryStorage() expected_picture = bytes([0, 1, 2, 3, 4]) item = Item( serial_number="serial_number", @@ -30,7 +30,7 @@ def test_save_item_binaries_should_write_image_in_memory(self, generate_id_mocke def test_get_item_binary_should_return_requested_item_binary(self): # Given - binary_storage = MemoryBinaryStorage() + binary_storage = InMemoryBinaryStorage() expected_picture = bytes([0, 1, 2, 3, 4]) another_picture = bytes([5, 6, 7, 8, 9]) binary_storage.binaries = { @@ -48,7 +48,7 @@ def test_get_item_binary_should_return_requested_item_binary(self): def test_get_item_binaries_should_return_all_item_binaries_names(self): # Given - binary_storage = MemoryBinaryStorage() + binary_storage = InMemoryBinaryStorage() expected_picture_1 = bytes([0, 1, 2, 3, 4]) expected_picture_2 = bytes([5, 6, 7, 8, 9]) binary_storage.binaries = { diff --git a/edge_orchestrator/tests/unit_tests/infrastructure/metadata_storage/test_memory_item_storage.py b/edge_orchestrator/tests/unit_tests/infrastructure/metadata_storage/test_memory_item_storage.py index 39b5cd48..4e9fbac6 100644 --- a/edge_orchestrator/tests/unit_tests/infrastructure/metadata_storage/test_memory_item_storage.py +++ b/edge_orchestrator/tests/unit_tests/infrastructure/metadata_storage/test_memory_item_storage.py @@ -1,12 +1,12 @@ -from edge_orchestrator.infrastructure.metadata_storage.memory_metadata_storage import ( - MemoryMetadataStorage, +from edge_orchestrator.infrastructure.metadata_storage.in_memory_metadata_storage import ( + InMemoryMetadataStorage, ) class TestMemoryItemStorage: def test_save_item_metadata_should_write_item_in_memory(self, my_item_0): # Given - metadata_storage = MemoryMetadataStorage() + metadata_storage = InMemoryMetadataStorage() # When metadata_storage.save_item_metadata(my_item_0) @@ -40,7 +40,7 @@ def test_save_item_metadata_should_write_item_in_memory(self, my_item_0): def test_get_item_metadata_should_return_requested_item_metadata(self, my_item_0): # Given - metadata_storage = MemoryMetadataStorage() + metadata_storage = InMemoryMetadataStorage() metadata_storage.items_metadata[my_item_0.id] = my_item_0.get_metadata() # When @@ -65,7 +65,7 @@ def test_get_item_metadata_should_return_requested_item_metadata(self, my_item_0 def test_get_all_items_metadata_should_return_all_items(self, my_item_0, my_item_2): # Given - metadata_storage = MemoryMetadataStorage() + metadata_storage = InMemoryMetadataStorage() metadata_storage.items_metadata[my_item_0.id] = my_item_0.get_metadata() metadata_storage.items_metadata[my_item_2.id] = my_item_2.get_metadata() From fc74dcb5561544c169eab83672d876c43754d937 Mon Sep 17 00:00:00 2001 From: Baptiste O'Jeanson Date: Mon, 18 Sep 2023 13:41:18 +0200 Subject: [PATCH 2/2] WIP --- edge_orchestrator/edge_orchestrator/application/api_routes.py | 1 + .../edge_orchestrator/application/dto/model_config.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/edge_orchestrator/edge_orchestrator/application/api_routes.py b/edge_orchestrator/edge_orchestrator/application/api_routes.py index 8577d170..73563046 100644 --- a/edge_orchestrator/edge_orchestrator/application/api_routes.py +++ b/edge_orchestrator/edge_orchestrator/application/api_routes.py @@ -104,3 +104,4 @@ def set_config( station_config_dto: StationConfig, ): logger.info(f"set config {station_config_dto.to_model()} to") + return station_config_dto.to_model() diff --git a/edge_orchestrator/edge_orchestrator/application/dto/model_config.py b/edge_orchestrator/edge_orchestrator/application/dto/model_config.py index 899a5c25..ddcfb436 100644 --- a/edge_orchestrator/edge_orchestrator/application/dto/model_config.py +++ b/edge_orchestrator/edge_orchestrator/application/dto/model_config.py @@ -12,6 +12,8 @@ ) from typing_extensions import Annotated +from domain.models.decision import Decision + class ModelNameEnum(str, Enum): inception = "inception" @@ -45,7 +47,7 @@ class ModelConfig(BaseModel): name: ModelNameEnum category: ModelCategoryEnum version: Version - class_names: Optional[List[str]] = None + class_names: Optional[List[Decision]] = [Decision.OK, Decision.KO] class_names_path: Optional[FilePath] = None output: Optional[ModelOutput] = None image_resolution: List[PositiveInt] = Field(min_items=2, max_items=2)