Skip to content

Commit

Permalink
rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
gireg.roussel committed Dec 26, 2024
1 parent 304f98c commit 8413abc
Show file tree
Hide file tree
Showing 23 changed files with 109 additions and 98 deletions.
12 changes: 4 additions & 8 deletions edge_orchestrator/edge_orchestrator/application/api_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,34 @@ def read_all(metadata_storage: MetadataStorage = Depends(get_metadata_storage)):
def get_item(
item_id: str,
metadata_storage: MetadataStorage = Depends(get_metadata_storage),
station_config: StationConfig = Depends(get_station_config),
):
return metadata_storage.get_item_metadata(item_id, station_config.active_config["name"])
return metadata_storage.get_item_metadata(item_id)


@api_router.get("/items/{item_id}/binaries/{camera_id}")
def get_item_binary(
item_id: str,
camera_id: str,
binary_storage: BinaryStorage = Depends(get_binary_storage),
station_config: StationConfig = Depends(get_station_config),
):
content_binary = binary_storage.get_item_binary(item_id, camera_id, station_config.active_config["name"])
content_binary = binary_storage.get_item_binary(item_id, camera_id)
return Response(content=content_binary, status_code=HTTPStatus.OK, media_type="image/jpeg")


@api_router.get("/items/{item_id}/binaries")
def get_item_binaries(
item_id: str,
binary_storage: BinaryStorage = Depends(get_binary_storage),
station_config: StationConfig = Depends(get_station_config),
):
return binary_storage.get_item_binaries(item_id, station_config.active_config["name"])
return binary_storage.get_item_binaries(item_id)


@api_router.get("/items/{item_id}/state")
def get_item_state(
item_id: str,
metadata_storage: MetadataStorage = Depends(get_metadata_storage),
station_config: StationConfig = Depends(get_station_config),
):
return metadata_storage.get_item_state(item_id, station_config.active_config["name"])
return metadata_storage.get_item_state(item_id)


@api_router.get("/inventory")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@

class BinaryStorage:
@abstractmethod
def save_item_binaries(self, item: Item, active_config_name: str):
def save_item_binaries(self, item: Item):
pass

@abstractmethod
def get_item_binary(self, item_id: str, camera_id: str, active_config_name: str) -> bytes:
def get_item_binary(self, item_id: str, camera_id: str) -> bytes:
pass

@abstractmethod
def get_item_binaries(self, item_id: str, active_config_name: str) -> List[str]:
def get_item_binaries(self, item_id: str) -> List[str]:
pass

@abstractmethod
def get_filepath(self, item_id: str, camera_id: str, active_config_name: str) -> str:
def get_filepath(self, item_id: str, camera_id: str) -> str:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@

class MetadataStorage:
@abstractmethod
def save_item_metadata(self, item: Item, active_config_name: str):
def save_item_metadata(self, item: Item):
pass

@abstractmethod
def get_item_metadata(self, item_id: str, active_config_name: str) -> Dict:
def get_item_metadata(self, item_id: str) -> Dict:
pass

@abstractmethod
def get_item_state(self, item_id: str, active_config_name: str) -> str:
def get_item_state(self, item_id: str) -> str:
pass

@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ def save_item_metadata(self, fct):
async def wrapper(item: Item, *args):
item.state = args[0].value
await fct(item)
active_config_name = self.station_config.active_config["name"]
self.metadata_storage.save_item_metadata(item, active_config_name)
self.metadata_storage.save_item_metadata(item)

return wrapper

Expand Down Expand Up @@ -124,8 +123,7 @@ async def set_error_state(item: Item, error_message: str):
logger.info(f"End of {supervisor_state.value}")

item.state = SupervisorState.DONE.value
active_config_name = self.station_config.active_config["name"]
self.metadata_storage.save_item_metadata(item, active_config_name)
self.metadata_storage.save_item_metadata(item)

async def get_predictions(self, item: Item) -> Dict[str, Dict]:
predictions = {}
Expand Down
11 changes: 5 additions & 6 deletions edge_orchestrator/edge_orchestrator/domain/use_cases/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@ def save_item_metadata(self, fct):
async def wrapper(item: Item, *args):
item.state = args[0].value
await fct(item)
active_config_name = args[1]
self.metadata_storage.save_item_metadata(item, active_config_name)
self.metadata_storage.save_item_metadata(item)

return wrapper

async def upload(self, item: Item, active_config_name: str):
async def upload(self, item: Item):
tasks = OrderedDict()

@self.save_item_metadata
async def save_item_binaries(item: Item):
self.binary_storage.save_item_binaries(item, active_config_name)
self.binary_storage.save_item_binaries(item)

async def set_error_state(item: Item, error_message: str):
item.error = True
Expand All @@ -51,12 +50,12 @@ async def set_error_state(item: Item, error_message: str):
logger.info(f"Starting {uploader_state.value}")
try:
logger.info(f"Entering try {uploader_state.value}")
await task_fct(item, uploader_state, active_config_name)
await task_fct(item, uploader_state)
except Exception as e:
logger.error(f"Error during {uploader_state.value}: {e}")
await set_error_state(item, str(e))

logger.info(f"End of {uploader_state.value}")

item.state = UploaderState.DONE.value
self.metadata_storage.save_item_metadata(item, active_config_name)
self.metadata_storage.save_item_metadata(item)
4 changes: 3 additions & 1 deletion edge_orchestrator/edge_orchestrator/environment/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ class Default(Config):
def __init__(self):
self.metadata_storage = MemoryMetadataStorage()
self.model_forward = FakeModelForward()
self.binary_storage = FileSystemBinaryStorage(self.ROOT_PATH / "data" / "storage")
self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json")
self.station_config = JsonStationConfig(
self.ROOT_PATH / "config" / "station_configs",
self.inventory,
self.ROOT_PATH / "data",
)
self.binary_storage = FileSystemBinaryStorage(
self.ROOT_PATH / "data" / "storage", self.station_config.active_config_name
)
self.edge_station = EdgeStation(self.station_config)
self.telemetry_sink = FakeTelemetrySink()
4 changes: 3 additions & 1 deletion edge_orchestrator/edge_orchestrator/environment/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ class Docker(Config):

def __init__(self):
self.metadata_storage = MongoDbMetadataStorage(self.MONGO_DB_URI)
self.binary_storage = FileSystemBinaryStorage(self.ROOT_PATH / "data" / "storage")
self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json")
self.station_config = JsonStationConfig(
station_configs_folder=self.ROOT_PATH / "config" / "station_configs",
inventory=self.inventory,
data_folder=self.ROOT_PATH / "data",
)
self.binary_storage = FileSystemBinaryStorage(
self.ROOT_PATH / "data" / "storage", self.station_config.active_config_name
)
self.edge_station = EdgeStation(self.station_config)
self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL, self.inventory, self.station_config)
self.telemetry_sink = PostgresTelemetrySink(self.POSTGRES_DB_URI)
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ class EdgeWithAzureContainerStorage(Config):
SERVING_MODEL_URL = os.environ.get("SERVING_MODEL_URL", "http://edge_model_serving:8501")

def __init__(self):
self.metadata_storage = AzureContainerMetadataStorage()
self.binary_storage = AzureContainerBinaryStorage()
self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json")
self.station_config = JsonStationConfig(
self.ROOT_PATH / "config" / "station_configs",
self.inventory,
self.ROOT_PATH / "data",
)
self.metadata_storage = AzureContainerMetadataStorage(self.station_config.active_config_name)
self.binary_storage = AzureContainerBinaryStorage(self.station_config.active_config_name)
self.edge_station = EdgeStation(self.station_config)
self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL, self.inventory, self.station_config)
self.telemetry_sink = AzureIotHubTelemetrySink()
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ class EdgeWithFileSystemMetadataStorage(Config):
SERVING_MODEL_URL = os.environ.get("SERVING_MODEL_URL", "http://edge_model_serving:8501")

def __init__(self):
self.metadata_storage = FileSystemMetadataStorage(self.ROOT_PATH / "data" / "storage")
self.binary_storage = FileSystemBinaryStorage(self.ROOT_PATH / "data" / "storage")
self.metadata_storage = FileSystemMetadataStorage(
self.ROOT_PATH / "data" / "storage", self.station_config.active_config_name
)
self.binary_storage = FileSystemBinaryStorage(
self.ROOT_PATH / "data" / "storage", self.station_config.active_config_name
)
self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json")
self.station_config = JsonStationConfig(
self.ROOT_PATH / "config" / "station_configs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ class EdgeWithMongoDbMetadataStorage(Config):

def __init__(self):
self.metadata_storage = MongoDbMetadataStorage(self.MONGO_DB_URI)
self.binary_storage = FileSystemBinaryStorage(self.ROOT_PATH / "data" / "storage")
self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json")
self.station_config = JsonStationConfig(
self.ROOT_PATH / "config" / "station_configs",
self.inventory,
self.ROOT_PATH / "data",
)
self.binary_storage = FileSystemBinaryStorage(
self.ROOT_PATH / "data" / "storage", self.station_config.active_config_name
)
self.edge_station = EdgeStation(self.station_config)
self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL, self.inventory, self.station_config)
self.telemetry_sink = AzureIotHubTelemetrySink()
4 changes: 3 additions & 1 deletion edge_orchestrator/edge_orchestrator/environment/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ class Local(Config):
def __init__(self):
self.metadata_storage = MemoryMetadataStorage()
self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL, self.inventory, self.station_config)
self.binary_storage = FileSystemBinaryStorage(self.ROOT_PATH / "data" / "storage")
self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json")
self.station_config = JsonStationConfig(
self.ROOT_PATH / "config" / "station_configs",
self.inventory,
self.ROOT_PATH / "data",
)
self.binary_storage = FileSystemBinaryStorage(
self.ROOT_PATH / "data" / "storage", self.station_config.active_config_name
)
self.edge_station = EdgeStation(self.station_config)
self.telemetry_sink = FakeTelemetrySink()
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ def __init__(self):
if prefix is None:
raise Exception("EDGE_NAME environment variable should be set")

self.metadata_storage = GCPMetadataStorage(prefix)
self.binary_storage = GCPBinaryStorage(prefix)
self.inventory = JsonInventory(self.ROOT_PATH / "config" / "inventory.json")
self.station_config = JsonStationConfig(
self.ROOT_PATH / "config" / "station_configs",
self.inventory,
self.ROOT_PATH / "data",
)
self.metadata_storage = GCPMetadataStorage(prefix, self.station_config.active_config_name)
self.binary_storage = GCPBinaryStorage(prefix, self.station_config.active_config_name)
self.edge_station = EdgeStation(self.station_config)
self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL, self.inventory, self.station_config)
self.telemetry_sink = FakeTelemetrySink()
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@


class AzureContainerBinaryStorage(BinaryStorage):
def __init__(self):
def __init__(self, active_config_name: str):
self.active_config_name = active_config_name
self.azure_container_name = os.getenv("AZURE_CONTAINER_NAME")
az_storage_connection_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
self._blob_service_client = BlobServiceClient.from_connection_string(az_storage_connection_str)
Expand All @@ -21,18 +22,18 @@ def __init__(self):
self._container_client = self._blob_service_client.get_container_client(self.azure_container_name)
self._transport_params = {"client": self._blob_service_client}

def save_item_binaries(self, item: Item, active_config_name: str):
def save_item_binaries(self, item: Item):
for camera_id, binary in item.binaries.items():
with open(
f"azure://{self.azure_container_name}/{active_config_name}/{item.id}_{camera_id}.jpg",
f"azure://{self.azure_container_name}/{self.active_config_name}/{item.id}_{camera_id}.jpg",
"wb",
transport_params=self._transport_params,
) as f:
f.write(binary)

def get_item_binary(self, item_id: str, camera_id: str, active_config_name: str) -> bytes:
def get_item_binary(self, item_id: str, camera_id: str) -> bytes:
with open(
f"azure://{self.azure_container_name}/{active_config_name}/{item_id}_{camera_id}.jpg",
f"azure://{self.azure_container_name}/{self.active_config_name}/{item_id}_{camera_id}.jpg",
"rb",
transport_params=self._transport_params,
) as f:
Expand All @@ -50,5 +51,5 @@ def get_item_binaries(self, item_id: str) -> List[str]:
binaries.append(f.read())
return binaries

def get_item_binary_filepath(self, item_id: str, camera_id: str, active_config_name: str) -> str:
return f"azure://{self.azure_container_name}/{active_config_name}/{item_id}_{camera_id}.jpg"
def get_item_binary_filepath(self, item_id: str, camera_id: str) -> str:
return f"azure://{self.azure_container_name}/{self.active_config_name}/{item_id}_{camera_id}.jpg"
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,29 @@


class FileSystemBinaryStorage(BinaryStorage):
def __init__(self, src_directory_path: Path):
def __init__(self, src_directory_path: Path, active_config_name: str):
self.active_config_name = active_config_name
self.folder = src_directory_path

def save_item_binaries(self, item: Item, active_config_name: str):
path = self.folder / active_config_name / item.id
def save_item_binaries(self, item: Item):
path = self.folder / self.active_config_name / item.id
path.mkdir(parents=True, exist_ok=True)
for camera_id, binary in item.binaries.items():
filepath = _get_filepath(self.folder, item.id, camera_id, active_config_name)
filepath = _get_filepath(self.folder, item.id, camera_id, self.active_config_name)
with filepath.open("wb") as f:
f.write(binary)

def get_item_binary(self, item_id: str, camera_id: str, active_config_name: str) -> bytes:
filepath = _get_filepath(self.folder, item_id, camera_id, active_config_name)
def get_item_binary(self, item_id: str, camera_id: str) -> bytes:
filepath = _get_filepath(self.folder, item_id, camera_id, self.active_config_name)
with filepath.open("rb") as f:
return f.read()

def get_item_binaries(self, item_id: str, active_config_name: str) -> List[str]:
filepath = self.folder / active_config_name / item_id
def get_item_binaries(self, item_id: str) -> List[str]:
filepath = self.folder / self.active_config_name / item_id
return [binary_path.name for binary_path in filepath.glob("*")]

def get_item_binary_filepath(self, item_id: str, camera_id: str, active_config_name: str) -> str:
return str(_get_filepath(self.folder, item_id, camera_id, active_config_name))
def get_item_binary_filepath(self, item_id: str, camera_id: str) -> str:
return str(_get_filepath(self.folder, item_id, camera_id, self.active_config_name))


def _get_filepath(folder: Path, item_id: str, camera_id: str, active_config_name: str) -> Path:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,28 @@


class GCPBinaryStorage(BinaryStorage):
def __init__(self, prefix: str):
def __init__(self, prefix: str, active_config_name: str):
self.storage_client = storage.Client()

self.prefix = prefix
self.active_config_name = active_config_name
self.bucket = self.storage_client.get_bucket(os.getenv("GCP_BUCKET_NAME"))

def save_item_binaries(self, item: Item, active_config_name: str) -> None:
def save_item_binaries(self, item: Item) -> None:
for camera_id, binary in item.binaries.items():
blob = self.bucket.blob(os.path.join(self.prefix, active_config_name, item.id, f"{camera_id}.jpg"))
blob = self.bucket.blob(os.path.join(self.prefix, self.active_config_name, item.id, f"{camera_id}.jpg"))
if blob is None:
raise Exception("An image should be upload")
blob.upload_from_string(binary, content_type="image/jpg")

def get_item_binary(self, item_id: str, camera_id: str, active_config_name: str) -> bytes:
filename = os.path.join(self.prefix, active_config_name, item_id, f"{camera_id}.jpg")
def get_item_binary(self, item_id: str, camera_id: str) -> bytes:
filename = os.path.join(self.prefix, self.active_config_name, item_id, f"{camera_id}.jpg")
blob = self.bucket.get_blob(filename)
if blob is None:
return None
return blob.download_as_bytes()

def get_item_binaries(self, item_id: str, active_config_name: str) -> List[str]:
def get_item_binaries(self, item_id: str) -> List[str]:
binaries = []
for blob in self.bucket.list_blobs():
if item_id in blob.name:
Expand All @@ -37,5 +38,5 @@ def get_item_binaries(self, item_id: str, active_config_name: str) -> List[str]:
binaries.append(binary)
return binaries

def get_item_binary_filepath(self, item_id: str, camera_id: str, active_config_name: str) -> str:
return os.path.join(self.prefix, active_config_name, item_id, f"{camera_id}.jpg")
def get_item_binary_filepath(self, item_id: str, camera_id: str) -> str:
return os.path.join(self.prefix, self.active_config_name, item_id, f"{camera_id}.jpg")
Loading

0 comments on commit 8413abc

Please sign in to comment.