Skip to content

Commit

Permalink
Multipart uploads (#45)
Browse files Browse the repository at this point in the history
* Allow examples to upload to other locations

* Add progress bar to uploads

* Initial core setup for multipart uploads

* make this 💩 work

* 🤨 tqdm this
  • Loading branch information
JBorrow authored Jan 24, 2025
1 parent e525db9 commit 1ab2c04
Show file tree
Hide file tree
Showing 20 changed files with 520 additions and 100 deletions.
10 changes: 8 additions & 2 deletions examples/aiola/simplepopulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Populates the simple example server with a bunch of ACT maps.
"""

import os
from pathlib import Path

import astropy.io.fits as fits
Expand All @@ -12,8 +13,13 @@
from hippoclient.product import create as create_product
from hippometa import MapSet, MapSetMap

API_KEY = "TEST_API_KEY"
SERVER_LOCATION = "http://127.0.0.1:8000"
API_KEY = os.getenv("HIPPO_API_KEY")
SERVER_LOCATION = os.getenv("HIPPO_HOST")

if API_KEY is None:
API_KEY = "TEST_API_KEY"
SERVER_LOCATION = "http://127.0.0.1:8000"

COLLECTION_NAME = (
"ACT DR4 Frequency Maps at 98 and 150 GHz presented in Aiola et al. 2020"
)
Expand Down
9 changes: 7 additions & 2 deletions examples/coulton/simplepopulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Populates the simple example server with a bunch of ACT maps.
"""

import os
from pathlib import Path

from hippoclient import Client
Expand All @@ -10,8 +11,12 @@
from hippoclient.product import create as create_product
from hippometa import BeamMetadata, MapSet, MapSetMap

API_KEY = "TEST_API_KEY"
SERVER_LOCATION = "http://127.0.0.1:8000"
API_KEY = os.getenv("HIPPO_API_KEY")
SERVER_LOCATION = os.getenv("HIPPO_HOST")

if API_KEY is None:
API_KEY = "TEST_API_KEY"
SERVER_LOCATION = "http://127.0.0.1:8000"
COLLECTION_NAME = "ACT (AdvACT) Compton-y"

COLLECTION_DESCRIPTION = """
Expand Down
10 changes: 8 additions & 2 deletions examples/hervias_caimapo/simplepopulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Populates the simple example server with a bunch of ACT maps.
"""

import os
from pathlib import Path

from hippoclient import Client
Expand All @@ -10,8 +11,13 @@
from hippoclient.product import create as create_product
from hippometa import CatalogMetadata

API_KEY = "TEST_API_KEY"
SERVER_LOCATION = "http://127.0.0.1:8000"
API_KEY = os.getenv("HIPPO_API_KEY")
SERVER_LOCATION = os.getenv("HIPPO_HOST")

if API_KEY is None:
API_KEY = "TEST_API_KEY"
SERVER_LOCATION = "http://127.0.0.1:8000"

COLLECTION_NAME = "ACT Targeted Transient Flux Constraints"

COLLECTION_DESCRIPTION = """
Expand Down
13 changes: 11 additions & 2 deletions examples/hilton/simplepopulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,25 @@
Populates the simple example server with a bunch of ACT maps.
"""

import os
from pathlib import Path

from hippoclient import Client
from hippoclient.collections import add as add_to_collection
from hippoclient.collections import create as create_collection
from hippoclient.core import ClientSettings
from hippoclient.product import create as create_product
from hippometa import CatalogMetadata, MapSet, MapSetMap

API_KEY = "TEST_API_KEY"
SERVER_LOCATION = "http://127.0.0.1:8000"
settings = ClientSettings()

API_KEY = os.getenv("HIPPO_API_KEY")
SERVER_LOCATION = os.getenv("HIPPO_HOST")

if API_KEY is None:
API_KEY = "TEST_API_KEY"
SERVER_LOCATION = "http://localhost:8000"

COLLECTION_NAME = "ACT DR5 SZ Cluster Catalog"

COLLECTION_DESCRIPTION = """
Expand Down
11 changes: 9 additions & 2 deletions examples/simpledelete.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@
to the example hippo server running at 127.0.0.1:8000.
"""

import os

from hippoclient import Client
from hippoclient.product import delete

API_KEY = "TEST_API_KEY"
SERVER_LOCATION = "http://127.0.0.1:8000"
API_KEY = os.getenv("HIPPO_API_KEY")
SERVER_LOCATION = os.getenv("HIPPO_HOST")

if API_KEY is None:
API_KEY = "TEST_API_KEY"
SERVER_LOCATION = "http://127.0.0.1:8000"


if __name__ == "__main__":
Expand All @@ -21,5 +27,6 @@
args = parser.parse_args()

client = Client(api_key=API_KEY, host=SERVER_LOCATION)
print(SERVER_LOCATION)

delete(client=client, id=args.id)
32 changes: 32 additions & 0 deletions examples/simpledeletecoll.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
A simple command-line utility for uploading files
to the example hippo server running at 127.0.0.1:8000.
"""

import os

from hippoclient import Client
from hippoclient.collections import delete

API_KEY = os.getenv("HIPPO_API_KEY")
SERVER_LOCATION = os.getenv("HIPPO_HOST")

if API_KEY is None:
API_KEY = "TEST_API_KEY"
SERVER_LOCATION = "http://127.0.0.1:8000"


if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser(
description="""Delete a product from the example hippo server.""",
)
parser.add_argument("id", help="The id of the product to delete")

args = parser.parse_args()

client = Client(api_key=API_KEY, host=SERVER_LOCATION)
print(SERVER_LOCATION)

delete(client=client, id=args.id)
13 changes: 10 additions & 3 deletions examples/simpleupload.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
to the example hippo server running at 127.0.0.1:8000.
"""

import os
from pathlib import Path

from hippoclient import Client
from hippoclient.product import create

API_KEY = "TEST_API_KEY"
SERVER_LOCATION = "http://127.0.0.1:8000"
API_KEY = os.getenv("HIPPO_API_KEY")
SERVER_LOCATION = os.getenv("HIPPO_HOST")

if API_KEY is None or SERVER_LOCATION is None:
API_KEY = "TEST_API_KEY"
SERVER_LOCATION = "http://127.0.0.1:8000"


if __name__ == "__main__":
Expand All @@ -26,7 +31,9 @@

args = parser.parse_args()

client = Client(api_key=API_KEY, host=SERVER_LOCATION)
client = Client(
api_key=API_KEY, host=SERVER_LOCATION, verbose=True, use_multipart_upload=False
)

create(
client=client,
Expand Down
98 changes: 62 additions & 36 deletions hippoclient/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path

import xxhash
from tqdm import tqdm

from hippometa import ALL_METADATA_TYPE
from hippometa.simple import SimpleMetadata
Expand All @@ -14,6 +15,8 @@

from .core import Client, MultiCache, console

MULTIPART_UPLOAD_SIZE = 50 * 1024 * 1024


def create(
client: Client,
Expand Down Expand Up @@ -88,6 +91,7 @@ def create(
"description": description,
"metadata": metadata.model_dump(),
"sources": source_metadata,
"mutlipart_batch_size": MULTIPART_UPLOAD_SIZE,
},
)

Expand All @@ -100,56 +104,78 @@ def create(
f"Successfully created product {this_product_id} in remote database."
)

responses = {}
sizes = {}

# Upload the sources to the presigned URLs.
for source in sources:
with source.open("rb") as file:
if client.verbose:
console.print("Uploading file:", source.name)

retry = True
upload_url = response.json()["upload_urls"][source.name]
upload_urls = response.json()["upload_urls"][source.name]
headers = []
size = []

# We need to handle our own redirects because otherwise the head of the file will be incorrect,
# and we will end up with Content-Length errors.
while retry:
if client.use_multipart_upload:
if client.verbose:
console.print("Using multipart upload")
individual_response = client.put(
upload_url,
files={"upload-file": (source.name, file)},
follow_redirects=False,
)
else:
if client.verbose:
console.print("Using regular upload")
individual_response = client.put(
upload_url.strip(),
data=file,
follow_redirects=True,
)

if client.verbose:
console.print(individual_response.content.decode("utf-8"))

if individual_response.status_code in [301, 302, 307, 308]:
if client.verbose:
console.print(
f"Redirected to {individual_response.headers['Location']} from {upload_url}"

with tqdm(
desc="Uploading",
total=source.stat().st_size,
unit="B",
unit_scale=True,
unit_divisor=1024,
) as t:
file_position = 0

for upload_url in upload_urls:
retry = True

while retry:
file.seek(file_position)

data = file.read(MULTIPART_UPLOAD_SIZE)

individual_response = client.put(
upload_url.strip(),
data=data,
follow_redirects=True,
)
upload_url = individual_response.headers["Location"]
file.seek(0)
continue
else:
retry = False
if client.verbose:
console.print("Retry set to false, file uploaded or failed")
individual_response.raise_for_status()
break

if individual_response.status_code in [301, 302, 307, 308]:
if client.verbose:
console.print(
f"Redirected to {individual_response.headers['Location']} from {upload_url}"
)
upload_url = individual_response.headers["Location"]

continue
else:
retry = False
individual_response.raise_for_status()
break

headers.append(dict(individual_response.headers))
size.append(len(data))

file_position += MULTIPART_UPLOAD_SIZE
t.update(len(data))

responses[source.name] = headers
sizes[source.name] = size

if client.verbose:
console.print("Successfully uploaded file:", source.name)

# Close out the upload.
response = client.post(
f"/product/{this_product_id}/complete",
json={"headers": responses, "sizes": sizes},
)

response.raise_for_status()

# Confirm the upload to hippo.
response = client.post(f"/product/{this_product_id}/confirm")

Expand Down
8 changes: 7 additions & 1 deletion hipposerve/api/models/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ class CreateProductRequest(BaseModel):
description: str
metadata: ALL_METADATA_TYPE
sources: list[PreUploadFile]
multipart_batch_size: int = 50 * 1024 * 1024


class CreateProductResponse(BaseModel):
id: PydanticObjectId
upload_urls: dict[str, str]
upload_urls: dict[str, list[str]]


class CompleteProductRequest(BaseModel):
headers: dict[str, list[dict[str, str]]]
sizes: dict[str, list[int]]


class ReadProductResponse(BaseModel):
Expand Down
Loading

0 comments on commit 1ab2c04

Please sign in to comment.