diff --git a/edge_orchestrator/edge_orchestrator/application/api_routes.py b/edge_orchestrator/edge_orchestrator/application/api_routes.py index 5f5962ad..b4c8546c 100644 --- a/edge_orchestrator/edge_orchestrator/application/api_routes.py +++ b/edge_orchestrator/edge_orchestrator/application/api_routes.py @@ -32,9 +32,8 @@ def read_all(metadata_storage: MetadataStorage = Depends(get_metadata_storage)): def get_item( item_id: str, metadata_storage: MetadataStorage = Depends(get_metadata_storage), - station_config: StationConfig = Depends(get_station_config), ): - return metadata_storage.get_item_metadata(item_id, station_config.active_config["name"]) + return metadata_storage.get_item_metadata(item_id) @api_router.get("/items/{item_id}/binaries/{camera_id}") @@ -42,9 +41,8 @@ def get_item_binary( item_id: str, camera_id: str, binary_storage: BinaryStorage = Depends(get_binary_storage), - station_config: StationConfig = Depends(get_station_config), ): - content_binary = binary_storage.get_item_binary(item_id, camera_id, station_config.active_config["name"]) + content_binary = binary_storage.get_item_binary(item_id, camera_id) return Response(content=content_binary, status_code=HTTPStatus.OK, media_type="image/jpeg") @@ -52,18 +50,16 @@ def get_item_binary( def get_item_binaries( item_id: str, binary_storage: BinaryStorage = Depends(get_binary_storage), - station_config: StationConfig = Depends(get_station_config), ): - return binary_storage.get_item_binaries(item_id, station_config.active_config["name"]) + return binary_storage.get_item_binaries(item_id) @api_router.get("/items/{item_id}/state") def get_item_state( item_id: str, metadata_storage: MetadataStorage = Depends(get_metadata_storage), - station_config: StationConfig = Depends(get_station_config), ): - return metadata_storage.get_item_state(item_id, station_config.active_config["name"]) + return metadata_storage.get_item_state(item_id) @api_router.get("/inventory") diff --git a/edge_orchestrator/edge_orchestrator/domain/ports/binary_storage.py b/edge_orchestrator/edge_orchestrator/domain/ports/binary_storage.py index db6d7c07..d19c80bc 100644 --- a/edge_orchestrator/edge_orchestrator/domain/ports/binary_storage.py +++ b/edge_orchestrator/edge_orchestrator/domain/ports/binary_storage.py @@ -6,17 +6,17 @@ class BinaryStorage: @abstractmethod - def save_item_binaries(self, item: Item, active_config_name: str): + def save_item_binaries(self, item: Item): pass @abstractmethod - def get_item_binary(self, item_id: str, camera_id: str, active_config_name: str) -> bytes: + def get_item_binary(self, item_id: str, camera_id: str) -> bytes: pass @abstractmethod - def get_item_binaries(self, item_id: str, active_config_name: str) -> List[str]: + def get_item_binaries(self, item_id: str) -> List[str]: pass @abstractmethod - def get_filepath(self, item_id: str, camera_id: str, active_config_name: str) -> str: + def get_filepath(self, item_id: str, camera_id: str) -> str: pass diff --git a/edge_orchestrator/edge_orchestrator/domain/ports/metadata_storage.py b/edge_orchestrator/edge_orchestrator/domain/ports/metadata_storage.py index f72a4efe..f3ce0b4c 100644 --- a/edge_orchestrator/edge_orchestrator/domain/ports/metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/domain/ports/metadata_storage.py @@ -6,15 +6,15 @@ class MetadataStorage: @abstractmethod - def save_item_metadata(self, item: Item, active_config_name: str): + def save_item_metadata(self, item: Item): pass @abstractmethod - def get_item_metadata(self, item_id: str, active_config_name: str) -> Dict: + def get_item_metadata(self, item_id: str) -> Dict: pass @abstractmethod - def get_item_state(self, item_id: str, active_config_name: str) -> str: + def get_item_state(self, item_id: str) -> str: pass @abstractmethod diff --git a/edge_orchestrator/edge_orchestrator/domain/use_cases/supervisor.py b/edge_orchestrator/edge_orchestrator/domain/use_cases/supervisor.py index d8c94d43..6c2b5554 100644 --- a/edge_orchestrator/edge_orchestrator/domain/use_cases/supervisor.py +++ b/edge_orchestrator/edge_orchestrator/domain/use_cases/supervisor.py @@ -63,8 +63,7 @@ def save_item_metadata(self, fct): async def wrapper(item: Item, *args): item.state = args[0].value await fct(item) - active_config_name = self.station_config.active_config["name"] - self.metadata_storage.save_item_metadata(item, active_config_name) + self.metadata_storage.save_item_metadata(item) return wrapper @@ -124,8 +123,7 @@ async def set_error_state(item: Item, error_message: str): logger.info(f"End of {supervisor_state.value}") item.state = SupervisorState.DONE.value - active_config_name = self.station_config.active_config["name"] - self.metadata_storage.save_item_metadata(item, active_config_name) + self.metadata_storage.save_item_metadata(item) async def get_predictions(self, item: Item) -> Dict[str, Dict]: predictions = {} diff --git a/edge_orchestrator/edge_orchestrator/domain/use_cases/uploader.py b/edge_orchestrator/edge_orchestrator/domain/use_cases/uploader.py index 28122ffc..648fb620 100644 --- a/edge_orchestrator/edge_orchestrator/domain/use_cases/uploader.py +++ b/edge_orchestrator/edge_orchestrator/domain/use_cases/uploader.py @@ -29,17 +29,16 @@ def save_item_metadata(self, fct): async def wrapper(item: Item, *args): item.state = args[0].value await fct(item) - active_config_name = args[1] - self.metadata_storage.save_item_metadata(item, active_config_name) + self.metadata_storage.save_item_metadata(item) return wrapper - async def upload(self, item: Item, active_config_name: str): + async def upload(self, item: Item): tasks = OrderedDict() @self.save_item_metadata async def save_item_binaries(item: Item): - self.binary_storage.save_item_binaries(item, active_config_name) + self.binary_storage.save_item_binaries(item) async def set_error_state(item: Item, error_message: str): item.error = True @@ -51,7 +50,7 @@ async def set_error_state(item: Item, error_message: str): logger.info(f"Starting {uploader_state.value}") try: logger.info(f"Entering try {uploader_state.value}") - await task_fct(item, uploader_state, active_config_name) + await task_fct(item, uploader_state) except Exception as e: logger.error(f"Error during {uploader_state.value}: {e}") await set_error_state(item, str(e)) @@ -59,4 +58,4 @@ async def set_error_state(item: Item, error_message: str): logger.info(f"End of {uploader_state.value}") item.state = UploaderState.DONE.value - self.metadata_storage.save_item_metadata(item, active_config_name) + self.metadata_storage.save_item_metadata(item) diff --git a/edge_orchestrator/edge_orchestrator/environment/default.py b/edge_orchestrator/edge_orchestrator/environment/default.py index 05fad405..33b122c4 100644 --- a/edge_orchestrator/edge_orchestrator/environment/default.py +++ b/edge_orchestrator/edge_orchestrator/environment/default.py @@ -22,12 +22,14 @@ class Default(Config): def __init__(self): self.metadata_storage = MemoryMetadataStorage() self.model_forward = FakeModelForward() - self.binary_storage = FileSystemBinaryStorage(self.ROOT_PATH / "data" / "storage") self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json") self.station_config = JsonStationConfig( self.ROOT_PATH / "config" / "station_configs", self.inventory, self.ROOT_PATH / "data", ) + self.binary_storage = FileSystemBinaryStorage( + self.ROOT_PATH / "data" / "storage", self.station_config.active_config_name + ) self.edge_station = EdgeStation(self.station_config) self.telemetry_sink = FakeTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/environment/docker.py b/edge_orchestrator/edge_orchestrator/environment/docker.py index f5f5ee8f..232e9570 100644 --- a/edge_orchestrator/edge_orchestrator/environment/docker.py +++ b/edge_orchestrator/edge_orchestrator/environment/docker.py @@ -27,13 +27,15 @@ class Docker(Config): def __init__(self): self.metadata_storage = MongoDbMetadataStorage(self.MONGO_DB_URI) - self.binary_storage = FileSystemBinaryStorage(self.ROOT_PATH / "data" / "storage") self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json") self.station_config = JsonStationConfig( station_configs_folder=self.ROOT_PATH / "config" / "station_configs", inventory=self.inventory, data_folder=self.ROOT_PATH / "data", ) + self.binary_storage = FileSystemBinaryStorage( + self.ROOT_PATH / "data" / "storage", self.station_config.active_config_name + ) self.edge_station = EdgeStation(self.station_config) self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL, self.inventory, self.station_config) self.telemetry_sink = PostgresTelemetrySink(self.POSTGRES_DB_URI) diff --git a/edge_orchestrator/edge_orchestrator/environment/edge_with_azure_container_storage.py b/edge_orchestrator/edge_orchestrator/environment/edge_with_azure_container_storage.py index 8928d2cb..f422854d 100644 --- a/edge_orchestrator/edge_orchestrator/environment/edge_with_azure_container_storage.py +++ b/edge_orchestrator/edge_orchestrator/environment/edge_with_azure_container_storage.py @@ -24,14 +24,14 @@ class EdgeWithAzureContainerStorage(Config): SERVING_MODEL_URL = os.environ.get("SERVING_MODEL_URL", "http://edge_model_serving:8501") def __init__(self): - self.metadata_storage = AzureContainerMetadataStorage() - self.binary_storage = AzureContainerBinaryStorage() self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json") self.station_config = JsonStationConfig( self.ROOT_PATH / "config" / "station_configs", self.inventory, self.ROOT_PATH / "data", ) + self.metadata_storage = AzureContainerMetadataStorage(self.station_config.active_config_name) + self.binary_storage = AzureContainerBinaryStorage(self.station_config.active_config_name) self.edge_station = EdgeStation(self.station_config) self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL, self.inventory, self.station_config) self.telemetry_sink = AzureIotHubTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/environment/edge_with_filesystem_metadata_storage.py b/edge_orchestrator/edge_orchestrator/environment/edge_with_filesystem_metadata_storage.py index a54253e6..b3d89aeb 100644 --- a/edge_orchestrator/edge_orchestrator/environment/edge_with_filesystem_metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/environment/edge_with_filesystem_metadata_storage.py @@ -24,8 +24,12 @@ class EdgeWithFileSystemMetadataStorage(Config): SERVING_MODEL_URL = os.environ.get("SERVING_MODEL_URL", "http://edge_model_serving:8501") def __init__(self): - self.metadata_storage = FileSystemMetadataStorage(self.ROOT_PATH / "data" / "storage") - self.binary_storage = FileSystemBinaryStorage(self.ROOT_PATH / "data" / "storage") + self.metadata_storage = FileSystemMetadataStorage( + self.ROOT_PATH / "data" / "storage", self.station_config.active_config_name + ) + self.binary_storage = FileSystemBinaryStorage( + self.ROOT_PATH / "data" / "storage", self.station_config.active_config_name + ) self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json") self.station_config = JsonStationConfig( self.ROOT_PATH / "config" / "station_configs", diff --git a/edge_orchestrator/edge_orchestrator/environment/edge_with_mongo_db_metadata_storage.py b/edge_orchestrator/edge_orchestrator/environment/edge_with_mongo_db_metadata_storage.py index 9ac3514a..dcf7fec6 100644 --- a/edge_orchestrator/edge_orchestrator/environment/edge_with_mongo_db_metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/environment/edge_with_mongo_db_metadata_storage.py @@ -26,13 +26,15 @@ class EdgeWithMongoDbMetadataStorage(Config): def __init__(self): self.metadata_storage = MongoDbMetadataStorage(self.MONGO_DB_URI) - self.binary_storage = FileSystemBinaryStorage(self.ROOT_PATH / "data" / "storage") self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json") self.station_config = JsonStationConfig( self.ROOT_PATH / "config" / "station_configs", self.inventory, self.ROOT_PATH / "data", ) + self.binary_storage = FileSystemBinaryStorage( + self.ROOT_PATH / "data" / "storage", self.station_config.active_config_name + ) self.edge_station = EdgeStation(self.station_config) self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL, self.inventory, self.station_config) self.telemetry_sink = AzureIotHubTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/environment/local.py b/edge_orchestrator/edge_orchestrator/environment/local.py index 8075a877..3fa0a794 100644 --- a/edge_orchestrator/edge_orchestrator/environment/local.py +++ b/edge_orchestrator/edge_orchestrator/environment/local.py @@ -26,12 +26,14 @@ class Local(Config): def __init__(self): self.metadata_storage = MemoryMetadataStorage() self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL, self.inventory, self.station_config) - self.binary_storage = FileSystemBinaryStorage(self.ROOT_PATH / "data" / "storage") self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json") self.station_config = JsonStationConfig( self.ROOT_PATH / "config" / "station_configs", self.inventory, self.ROOT_PATH / "data", ) + self.binary_storage = FileSystemBinaryStorage( + self.ROOT_PATH / "data" / "storage", self.station_config.active_config_name + ) self.edge_station = EdgeStation(self.station_config) self.telemetry_sink = FakeTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/environment/upload_with_gcp_bucket.py b/edge_orchestrator/edge_orchestrator/environment/upload_with_gcp_bucket.py index 0118c1db..aa5120ab 100644 --- a/edge_orchestrator/edge_orchestrator/environment/upload_with_gcp_bucket.py +++ b/edge_orchestrator/edge_orchestrator/environment/upload_with_gcp_bucket.py @@ -28,14 +28,14 @@ def __init__(self): if prefix is None: raise Exception("EDGE_NAME environment variable should be set") - self.metadata_storage = GCPMetadataStorage(prefix) - self.binary_storage = GCPBinaryStorage(prefix) self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json") self.station_config = JsonStationConfig( self.ROOT_PATH / "config" / "station_configs", self.inventory, self.ROOT_PATH / "data", ) + self.metadata_storage = GCPMetadataStorage(prefix, self.station_config.active_config_name) + self.binary_storage = GCPBinaryStorage(prefix, self.station_config.active_config_name) self.edge_station = EdgeStation(self.station_config) self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL, self.inventory, self.station_config) self.telemetry_sink = FakeTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/azure_container_binary_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/azure_container_binary_storage.py index c2903741..31435b36 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/azure_container_binary_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/azure_container_binary_storage.py @@ -10,7 +10,8 @@ class AzureContainerBinaryStorage(BinaryStorage): - def __init__(self): + def __init__(self, active_config_name: str): + self.active_config_name = active_config_name self.azure_container_name = os.getenv("AZURE_CONTAINER_NAME") az_storage_connection_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING") self._blob_service_client = BlobServiceClient.from_connection_string(az_storage_connection_str) @@ -21,18 +22,18 @@ def __init__(self): self._container_client = self._blob_service_client.get_container_client(self.azure_container_name) self._transport_params = {"client": self._blob_service_client} - def save_item_binaries(self, item: Item, active_config_name: str): + def save_item_binaries(self, item: Item): for camera_id, binary in item.binaries.items(): with open( - f"azure://{self.azure_container_name}/{active_config_name}/{item.id}_{camera_id}.jpg", + f"azure://{self.azure_container_name}/{self.active_config_name}/{item.id}_{camera_id}.jpg", "wb", transport_params=self._transport_params, ) as f: f.write(binary) - def get_item_binary(self, item_id: str, camera_id: str, active_config_name: str) -> bytes: + def get_item_binary(self, item_id: str, camera_id: str) -> bytes: with open( - f"azure://{self.azure_container_name}/{active_config_name}/{item_id}_{camera_id}.jpg", + f"azure://{self.azure_container_name}/{self.active_config_name}/{item_id}_{camera_id}.jpg", "rb", transport_params=self._transport_params, ) as f: @@ -50,5 +51,5 @@ def get_item_binaries(self, item_id: str) -> List[str]: binaries.append(f.read()) return binaries - def get_item_binary_filepath(self, item_id: str, camera_id: str, active_config_name: str) -> str: - return f"azure://{self.azure_container_name}/{active_config_name}/{item_id}_{camera_id}.jpg" + def get_item_binary_filepath(self, item_id: str, camera_id: str) -> str: + return f"azure://{self.azure_container_name}/{self.active_config_name}/{item_id}_{camera_id}.jpg" diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/filesystem_binary_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/filesystem_binary_storage.py index 3ad2a355..bb5dcf07 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/filesystem_binary_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/filesystem_binary_storage.py @@ -6,28 +6,29 @@ class FileSystemBinaryStorage(BinaryStorage): - def __init__(self, src_directory_path: Path): + def __init__(self, src_directory_path: Path, active_config_name: str): + self.active_config_name = active_config_name self.folder = src_directory_path - def save_item_binaries(self, item: Item, active_config_name: str): - path = self.folder / active_config_name / item.id + def save_item_binaries(self, item: Item): + path = self.folder / self.active_config_name / item.id path.mkdir(parents=True, exist_ok=True) for camera_id, binary in item.binaries.items(): - filepath = _get_filepath(self.folder, item.id, camera_id, active_config_name) + filepath = _get_filepath(self.folder, item.id, camera_id, self.active_config_name) with filepath.open("wb") as f: f.write(binary) - def get_item_binary(self, item_id: str, camera_id: str, active_config_name: str) -> bytes: - filepath = _get_filepath(self.folder, item_id, camera_id, active_config_name) + def get_item_binary(self, item_id: str, camera_id: str) -> bytes: + filepath = _get_filepath(self.folder, item_id, camera_id, self.active_config_name) with filepath.open("rb") as f: return f.read() - def get_item_binaries(self, item_id: str, active_config_name: str) -> List[str]: - filepath = self.folder / active_config_name / item_id + def get_item_binaries(self, item_id: str) -> List[str]: + filepath = self.folder / self.active_config_name / item_id return [binary_path.name for binary_path in filepath.glob("*")] - def get_item_binary_filepath(self, item_id: str, camera_id: str, active_config_name: str) -> str: - return str(_get_filepath(self.folder, item_id, camera_id, active_config_name)) + def get_item_binary_filepath(self, item_id: str, camera_id: str) -> str: + return str(_get_filepath(self.folder, item_id, camera_id, self.active_config_name)) def _get_filepath(folder: Path, item_id: str, camera_id: str, active_config_name: str) -> Path: diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/gcp_binary_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/gcp_binary_storage.py index ac19e5df..72104ce7 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/gcp_binary_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/gcp_binary_storage.py @@ -8,27 +8,28 @@ class GCPBinaryStorage(BinaryStorage): - def __init__(self, prefix: str): + def __init__(self, prefix: str, active_config_name: str): self.storage_client = storage.Client() self.prefix = prefix + self.active_config_name = active_config_name self.bucket = self.storage_client.get_bucket(os.getenv("GCP_BUCKET_NAME")) - def save_item_binaries(self, item: Item, active_config_name: str) -> None: + def save_item_binaries(self, item: Item) -> None: for camera_id, binary in item.binaries.items(): - blob = self.bucket.blob(os.path.join(self.prefix, active_config_name, item.id, f"{camera_id}.jpg")) + blob = self.bucket.blob(os.path.join(self.prefix, self.active_config_name, item.id, f"{camera_id}.jpg")) if blob is None: raise Exception("An image should be upload") blob.upload_from_string(binary, content_type="image/jpg") - def get_item_binary(self, item_id: str, camera_id: str, active_config_name: str) -> bytes: - filename = os.path.join(self.prefix, active_config_name, item_id, f"{camera_id}.jpg") + def get_item_binary(self, item_id: str, camera_id: str) -> bytes: + filename = os.path.join(self.prefix, self.active_config_name, item_id, f"{camera_id}.jpg") blob = self.bucket.get_blob(filename) if blob is None: return None return blob.download_as_bytes() - def get_item_binaries(self, item_id: str, active_config_name: str) -> List[str]: + def get_item_binaries(self, item_id: str) -> List[str]: binaries = [] for blob in self.bucket.list_blobs(): if item_id in blob.name: @@ -37,5 +38,5 @@ def get_item_binaries(self, item_id: str, active_config_name: str) -> List[str]: binaries.append(binary) return binaries - def get_item_binary_filepath(self, item_id: str, camera_id: str, active_config_name: str) -> str: - return os.path.join(self.prefix, active_config_name, item_id, f"{camera_id}.jpg") + def get_item_binary_filepath(self, item_id: str, camera_id: str) -> str: + return os.path.join(self.prefix, self.active_config_name, item_id, f"{camera_id}.jpg") diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/memory_binary_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/memory_binary_storage.py index 301b6514..c03a79ee 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/memory_binary_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/binary_storage/memory_binary_storage.py @@ -8,17 +8,17 @@ class MemoryBinaryStorage(BinaryStorage): def __init__(self): self.binaries = {} - def save_item_binaries(self, item: Item, active_config_name: str) -> None: + def save_item_binaries(self, item: Item) -> None: binaries_dict = {} for camera_id, binary in item.binaries.items(): binaries_dict[camera_id] = binary self.binaries[item.id] = binaries_dict - def get_item_binary(self, item_id: str, camera_id: str, active_config_name: str) -> bytes: + def get_item_binary(self, item_id: str, camera_id: str) -> bytes: return self.binaries[item_id][camera_id] - def get_item_binaries(self, item_id: str, active_config_name: str) -> List[str]: + def get_item_binaries(self, item_id: str) -> List[str]: return list(self.binaries[item_id].keys()) - def get_filepath(self, item_id: str, camera_id: str, active_config_name: str) -> str: + def get_filepath(self, item_id: str, camera_id: str) -> str: return NotImplementedError diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/azure_container_metadata_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/azure_container_metadata_storage.py index 11754913..5a70f24d 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/azure_container_metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/azure_container_metadata_storage.py @@ -11,7 +11,8 @@ class AzureContainerMetadataStorage(MetadataStorage): - def __init__(self): + def __init__(self, active_config_name: str): + self.active_config_name = active_config_name self.azure_container_name = os.getenv("AZURE_CONTAINER_NAME") az_storage_connection_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING") self._blob_service_client = BlobServiceClient.from_connection_string(az_storage_connection_str) @@ -22,24 +23,24 @@ def __init__(self): self._container_client = self._blob_service_client.get_container_client(self.azure_container_name) self._transport_params = {"client": self._blob_service_client} - def save_item_metadata(self, item: Item, active_config_name: str): + def save_item_metadata(self, item: Item): with open( - f"azure://{self.azure_container_name}/{active_config_name}/{item.id}/metadata.json", + f"azure://{self.azure_container_name}/{self.active_config_name}/{item.id}/metadata.json", "wb", transport_params=self._transport_params, ) as f: f.write(json.dumps(item.get_metadata()).encode("utf-8")) - def get_item_metadata(self, item_id: str, active_config_name: str) -> Dict: + def get_item_metadata(self, item_id: str) -> Dict: with open( - f"azure://{self.azure_container_name}/{active_config_name}/{item_id}/metadata.json", + f"azure://{self.azure_container_name}/{self.active_config_name}/{item_id}/metadata.json", "rb", transport_params=self._transport_params, ) as f: return json.loads(f.read()) - def get_item_state(self, item_id: str, active_config_name: str) -> str: - item_metadata = self.get_item_metadata(item_id, active_config_name) + def get_item_state(self, item_id: str) -> str: + item_metadata = self.get_item_metadata(item_id) return item_metadata["state"] def get_all_items_metadata(self) -> List[Dict]: diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/filesystem_metadata_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/filesystem_metadata_storage.py index b5ffc60d..8e339417 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/filesystem_metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/filesystem_metadata_storage.py @@ -7,23 +7,24 @@ class FileSystemMetadataStorage(MetadataStorage): - def __init__(self, src_directory_path: Path): + def __init__(self, src_directory_path: Path, active_config_name: str): self.folder = src_directory_path + self.active_config_name = active_config_name - def save_item_metadata(self, item: Item, active_config_name: str): - (self.folder / active_config_name / item.id).mkdir(parents=True, exist_ok=True) - filepath = _get_filepath(self.folder, item.id, active_config_name) + def save_item_metadata(self, item: Item): + (self.folder / self.active_config_name / item.id).mkdir(parents=True, exist_ok=True) + filepath = _get_filepath(self.folder, item.id, self.active_config_name) with filepath.open("w") as f: json.dump(item.get_metadata(), f) - def get_item_metadata(self, item_id: str, active_config_name: str) -> Dict: - filepath = _get_filepath(self.folder, item_id, active_config_name) + def get_item_metadata(self, item_id: str) -> Dict: + filepath = _get_filepath(self.folder, item_id, self.active_config_name) with filepath.open("r") as f: item_metadata = json.load(f) return item_metadata - def get_item_state(self, item_id: str, active_config_name: str) -> str: - item_metadata = self.get_item_metadata(item_id, active_config_name) + def get_item_state(self, item_id: str) -> str: + item_metadata = self.get_item_metadata(item_id) return item_metadata["state"] def get_all_items_metadata(self) -> List[Dict]: diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/gcp_metadata_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/gcp_metadata_storage.py index 0342cc82..182b2c38 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/gcp_metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/gcp_metadata_storage.py @@ -9,27 +9,28 @@ class GCPMetadataStorage(MetadataStorage): - def __init__(self, prefix: str): + def __init__(self, prefix: str, active_config_name: str): self.storage_client = storage.Client() self.prefix = prefix + self.active_config_name = active_config_name self.bucket = self.storage_client.get_bucket(os.getenv("GCP_BUCKET_NAME")) - def save_item_metadata(self, item: Item, active_config_name: str): + def save_item_metadata(self, item: Item): item_metadata = json.dumps(item.get_metadata()) - blob = self.bucket.blob(os.path.join(self.prefix, active_config_name, item.id, "metadata.json")) + blob = self.bucket.blob(os.path.join(self.prefix, self.active_config_name, item.id, "metadata.json")) blob.upload_from_string(item_metadata, content_type="application/json") - def get_item_metadata(self, item_id: str, active_config_name: str) -> Dict: - filename = os.path.join(self.prefix, active_config_name, item_id, "metadata.json") + def get_item_metadata(self, item_id: str) -> Dict: + filename = os.path.join(self.prefix, self.active_config_name, item_id, "metadata.json") blob = self.bucket.get_blob(filename) if blob is None: raise Exception("No file with this id exist") metadata = json.loads(blob.download_as_string()) return metadata - def get_item_state(self, item_id: str, active_config_name: str) -> str: - item_metadata = self.get_item_metadata(item_id, active_config_name) + def get_item_state(self, item_id: str) -> str: + item_metadata = self.get_item_metadata(item_id, self.active_config_name) return item_metadata["state"] def get_all_items_metadata(self) -> List[Dict]: diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/memory_metadata_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/memory_metadata_storage.py index 393efd44..45b41116 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/memory_metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/memory_metadata_storage.py @@ -8,16 +8,16 @@ class MemoryMetadataStorage(MetadataStorage): def __init__(self): self.items_metadata = {} - def save_item_metadata(self, item: Item, active_config_name: str = None): + def save_item_metadata(self, item: Item): if item.id in self.items_metadata: self.items_metadata[item.id].update(item.get_metadata()) else: self.items_metadata[item.id] = item.get_metadata() - def get_item_metadata(self, item_id: str, active_config_name: str = None) -> Dict: + def get_item_metadata(self, item_id: str = None) -> Dict: return self.items_metadata[item_id] - def get_item_state(self, item_id: str, active_config_name: str = None) -> str: + def get_item_state(self, item_id: str = None) -> str: item = self.items_metadata[item_id] return item["state"] diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/mongodb_metadata_storage.py b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/mongodb_metadata_storage.py index bc3196eb..5f2e667f 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/mongodb_metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/mongodb_metadata_storage.py @@ -12,15 +12,15 @@ def __init__(self, mongodb_uri: str): self.db = self.client["orchestratorDB"] self.items_metadata = self.db["items"] - def save_item_metadata(self, item: Item, active_config_name: str = None): + def save_item_metadata(self, item: Item = None): self.items_metadata.update_one({"_id": item.id}, {"$set": item.get_metadata(False)}, upsert=True) - def get_item_metadata(self, item_id: str, active_config_name: str = None) -> Dict: + def get_item_metadata(self, item_id: str = None) -> Dict: mongo_output = self.items_metadata.find_one({"_id": item_id}) mongo_output["id"] = mongo_output.pop("_id") return mongo_output - def get_item_state(self, item_id: str, active_config_name: str = None) -> str: + def get_item_state(self, item_id: str = None) -> str: item = self.items_metadata.find_one({"_id": item_id}) return item["state"] diff --git a/edge_orchestrator/tests/unit_tests/infrastructure/binary_storage/test_filesystem_binary_storage.py b/edge_orchestrator/tests/unit_tests/infrastructure/binary_storage/test_filesystem_binary_storage.py index 4313286a..67793829 100644 --- a/edge_orchestrator/tests/unit_tests/infrastructure/binary_storage/test_filesystem_binary_storage.py +++ b/edge_orchestrator/tests/unit_tests/infrastructure/binary_storage/test_filesystem_binary_storage.py @@ -13,9 +13,9 @@ def test_save_item_binaries_should_write_image_on_filesystem(self, generate_id_m # Given generate_id_mocked.return_value = "my_item_id" src_directory_path = Path(tmpdir.mkdir("binaries")) - binary_storage = FileSystemBinaryStorage(src_directory_path) expected_picture = bytes([0, 1, 2, 3, 4]) active_config_name = "detection_model" + binary_storage = FileSystemBinaryStorage(src_directory_path, active_config_name) item = Item( serial_number="serial_number", @@ -26,7 +26,7 @@ def test_save_item_binaries_should_write_image_on_filesystem(self, generate_id_m ) # When - binary_storage.save_item_binaries(item, active_config_name) + binary_storage.save_item_binaries(item) # Then path_to_my_picture = src_directory_path / active_config_name / "my_item_id" / "camera_id.jpg" @@ -37,9 +37,9 @@ def test_save_item_binaries_should_write_image_on_filesystem(self, generate_id_m def test_get_item_binary_should_return_requested_item_binary(self, tmpdir): # Given src_directory_path = Path(tmpdir.mkdir("binaries")) - binary_storage = FileSystemBinaryStorage(src_directory_path) expected_picture = bytes([0, 1, 2, 3, 4]) active_config_name = "detection_model" + binary_storage = FileSystemBinaryStorage(src_directory_path, active_config_name) (src_directory_path / active_config_name / "my_item_id").mkdir(parents=True) with (src_directory_path / active_config_name / "my_item_id" / "camera_id.jpg").open("wb") as f: f.write(expected_picture) @@ -53,10 +53,10 @@ def test_get_item_binary_should_return_requested_item_binary(self, tmpdir): def test_get_item_binaries_should_return_all_item_binaries_names(self, tmpdir): # Given src_directory_path = Path(tmpdir.mkdir("binaries")) - binary_storage = FileSystemBinaryStorage(src_directory_path) expected_picture_1 = bytes([0, 1, 2, 3, 4]) expected_picture_2 = bytes([5, 6, 7, 8, 9]) active_config_name = "detection_model" + binary_storage = FileSystemBinaryStorage(src_directory_path, active_config_name) (src_directory_path / active_config_name / "my_item_id").mkdir(parents=True) diff --git a/edge_orchestrator/tests/unit_tests/infrastructure/metadata_storage/test_filesystem_metadata_storage.py b/edge_orchestrator/tests/unit_tests/infrastructure/metadata_storage/test_filesystem_metadata_storage.py index 9e735d1f..c7ce8c67 100644 --- a/edge_orchestrator/tests/unit_tests/infrastructure/metadata_storage/test_filesystem_metadata_storage.py +++ b/edge_orchestrator/tests/unit_tests/infrastructure/metadata_storage/test_filesystem_metadata_storage.py @@ -24,8 +24,8 @@ def test_save_item_metadata_should_write_metadata_on_filesystem( dimensions=[], ) src_directory_path = Path(tmpdir.mkdir("metadata")) - metadata_storage = FileSystemMetadataStorage(src_directory_path) active_config_name = "detection_model" + metadata_storage = FileSystemMetadataStorage(src_directory_path, active_config_name) expected_response = { "id": item.id, @@ -42,7 +42,7 @@ def test_save_item_metadata_should_write_metadata_on_filesystem( } # When - metadata_storage.save_item_metadata(item, active_config_name) + metadata_storage.save_item_metadata(item) # Then path_to_my_metadata = src_directory_path / "detection_model" / "my_item_id" / "metadata.json"