Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding bandwith measurement to downloadCutout #9

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 82 additions & 14 deletions src/fibad/download.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import contextlib
import datetime

Check warning on line 2 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L2

Added line #L2 was not covered by tests
import os
import urllib.request

Check warning on line 4 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L4

Added line #L4 was not covered by tests
from pathlib import Path
from typing import Union

Expand Down Expand Up @@ -38,34 +40,34 @@

config = config.get("download", {})

print("Download command")
print("Download command Start")

Check warning on line 43 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L43

Added line #L43 was not covered by tests

fits_file = config.get("fits_file", "")
print(f"Reading in fits catalog: {fits_file}")

Check warning on line 46 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L45-L46

Added lines #L45 - L46 were not covered by tests
# Filter the fits file for the fields we want
column_names = ["object_id"] + variable_fields
locations = filterfits(config.get("fits_file"), column_names)
locations = filterfits(fits_file, column_names)

Check warning on line 49 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L49

Added line #L49 was not covered by tests

# Sort by tract, ra, dec to optimize speed that the cutout server can serve us
#
# TODO: See if this sort is performed by downloadCutouts
# It appears downloadCutouts is doing some sorting prior to download, but
# unclear if it is the same sort
locations.sort(variable_fields)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sorting turned out to be unnecessary.

The Rect class in downloadCutout.py has a very particular sort order imposed on it, and the download codepath calls Rect.explode() under the hood, which calls .sort() on a list of Rects particular to each request. This results in the server receiving lists sorted by Tract, RA, DEC and some other fields.

# TODO slice up the locations to multiplex across connections if necessary, but for now
# we simply mask off a few
offset = config.get("offset", 0)
end = offset + config.get("num_sources", 10)
locations = locations[offset:end]

Check warning on line 55 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L53-L55

Added lines #L53 - L55 were not covered by tests

# TODO slice up the locations
locations = locations[0:10]

# make a list of rects
# Make a list of rects to pass to downloadCutout
rects = create_rects(locations, offset=0, default=rect_from_config(config))

# Configure global parameters for the downloader
dC.set_max_connections(num=config.get("max_connections", 2))

print("Requesting cutouts")

Check warning on line 63 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L63

Added line #L63 was not covered by tests
# pass the rects to the cutout downloader
download_cutout_group(
rects=rects, cutout_dir=config.get("cutout_dir"), user=config["username"], password=config["password"]
)

print(locations)
# print(locations)
print("Done")

Check warning on line 70 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L70

Added line #L70 was not covered by tests


# TODO add error checking
Expand Down Expand Up @@ -118,11 +120,77 @@
return rects


stats = {

Check warning on line 123 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L123

Added line #L123 was not covered by tests
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole stats subsystem is pretty hacky. I've tried to make it unobtrusive to the CLI, but if we end up using it for anything beyond spot checks, we likely want to go find ourselves a library to do this.

"request_duration": datetime.timedelta(), # Time from request sent to first byte from the server
"response_duration": datetime.timedelta(), # Total time spent recieving and processing a response
"request_size_bytes": 0, # Total size of all requests
"response_size_bytes": 0, # Total size of all responses
"snapshots": 0, # Number of fits snapshots downloaded
}


def _stat_accumulate(name: str, value: Union[int, datetime.timedelta]):

Check warning on line 132 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L132

Added line #L132 was not covered by tests
global stats
stats[name] += value

Check warning on line 134 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L134

Added line #L134 was not covered by tests


def _print_stats():

Check warning on line 137 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L137

Added line #L137 was not covered by tests
global stats

total_dur_s = (stats["request_duration"] + stats["response_duration"]).total_seconds()

Check warning on line 140 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L140

Added line #L140 was not covered by tests

resp_s = stats["response_duration"].total_seconds()
down_rate_mb_s = (stats["response_size_bytes"] / (1024**2)) / resp_s

Check warning on line 143 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L142-L143

Added lines #L142 - L143 were not covered by tests

req_s = stats["request_duration"].total_seconds()
up_rate_mb_s = (stats["request_size_bytes"] / (1024**2)) / req_s

Check warning on line 146 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L145-L146

Added lines #L145 - L146 were not covered by tests

snapshot_rate = stats["snapshots"] / total_dur_s

Check warning on line 148 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L148

Added line #L148 was not covered by tests

print(

Check warning on line 150 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L150

Added line #L150 was not covered by tests
f"Stats: Duration: {total_dur_s:.2f} s, Files: {stats['snapshots']}, \
Upload: {up_rate_mb_s:.2f} MB/s, Download: {down_rate_mb_s:.2f} MB/s File rate: {snapshot_rate:.2f} files/s",
end="\r",
flush=True,
)


def request_hook(

Check warning on line 158 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L158

Added line #L158 was not covered by tests
request: urllib.request.Request,
request_start: datetime.datetime,
response_start: datetime.datetime,
response_size: int,
chunk_size: int,
):
"""
Called on each chunk of snapshots downloaded.
Called immediately after the server has finished responding to the
request, so datetime.datetime.now() is the end moment of the request

request: Our request object
request_start: datetime when we started sending the request
response_start: when the server responded to the request
response_size: Size in bytes of the response from the server.
"""

now = datetime.datetime.now()

Check warning on line 176 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L176

Added line #L176 was not covered by tests

_stat_accumulate("request_duration", response_start - request_start)
_stat_accumulate("response_duration", now - response_start)
_stat_accumulate("request_size_bytes", len(request.data))
_stat_accumulate("response_size_bytes", response_size)
_stat_accumulate("snapshots", chunk_size)

Check warning on line 182 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L178-L182

Added lines #L178 - L182 were not covered by tests

_print_stats()

Check warning on line 184 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L184

Added line #L184 was not covered by tests


def download_cutout_group(rects: list[dC.Rect], cutout_dir: Union[str, Path], user, password):
"""
Download cutouts to the given directory

Calls downloadCutout.download, so supports long lists of rects and
"""
with working_directory(Path(cutout_dir)):
dC.download(rects, user=user, password=password, onmemory=False)
dC.download(rects, user=user, password=password, onmemory=True, request_hook=request_hook)

Check warning on line 194 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L194

Added line #L194 was not covered by tests

print("") # Print a newline so the stats stay and look pretty.

Check warning on line 196 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L196

Added line #L196 was not covered by tests
63 changes: 52 additions & 11 deletions src/fibad/downloadCutout/downloadCutout.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import contextlib
import csv
import dataclasses
import datetime
import errno
import getpass
import io
Expand All @@ -15,6 +16,7 @@
import tempfile
import time
import urllib.request
import urllib.response
from collections.abc import Generator
from typing import IO, Any, Callable, Optional, Union, cast

Expand Down Expand Up @@ -323,7 +325,7 @@ class Rect:

rerun: str = default_rerun
type: str = default_type
filter: str = ALLFILTERS
filter: Union[str, list[str]] = ALLFILTERS
tract: int = ANYTRACT
ra: float = math.nan
dec: float = math.nan
Expand All @@ -339,7 +341,7 @@ class Rect:
def create(
rerun: Union[str, None] = None,
type: Union[str, None] = None,
filter: Union[str, None] = None,
filter: Union[Union[str, list[str]], None] = None,
tract: Union[str, int, None] = None,
ra: Union[str, float, None] = None,
dec: Union[str, float, None] = None,
Expand Down Expand Up @@ -460,7 +462,9 @@ def explode(self) -> list["Rect"]:
rects
List of `Rect` objects, each being more specific than `self`.
"""
if self.filter == ALLFILTERS:
if isinstance(self.filter, list):
return [Rect.create(filter=f, default=self) for f in self.filter]
elif self.filter == ALLFILTERS:
return [Rect.create(filter=f, default=self) for f in _all_filters]
else:
return [Rect.create(default=self)]
Expand Down Expand Up @@ -927,7 +931,7 @@ def parse_filter(s: str) -> str:
)


def parse_filter_opt(s: Optional[str]) -> str:
def parse_filter_opt(s: Optional[Union[str, list]]) -> Union[str, list]:
"""
Interpret a filter name.
The argument may be `ALLFILTERS`.or None
Expand All @@ -950,7 +954,8 @@ def parse_filter_opt(s: Optional[str]) -> str:
"""
if s is None:
return ALLFILTERS

if isinstance(s, list):
return [parse_filter(filter) for filter in s]
if s.lower() == ALLFILTERS:
return ALLFILTERS
return parse_filter(s)
Expand All @@ -963,6 +968,7 @@ def download(
password: Optional[str] = None,
*,
onmemory: bool = True,
**kwargs_request,
) -> Union[list, list[list], None]:
"""
Cut `rects` out of the sky.
Expand All @@ -978,6 +984,8 @@ def download(
onmemory
Return `datalist` on memory.
If `onmemory` is False, downloaded cut-outs are written to files.
kwargs_request
Additional keyword args are passed through to _download

Returns
-------
Expand All @@ -998,7 +1006,7 @@ def download(
rects = [cast(Rect, rects)]
rects = cast(list[Rect], rects)

ret = _download(rects, user, password, onmemory=onmemory)
ret = _download(rects, user, password, onmemory=onmemory, **kwargs_request)
if isscalar and onmemory:
ret = cast(list[list], ret)
return ret[0]
Expand All @@ -1007,7 +1015,13 @@ def download(


def _download(
rects: list[Rect], user: Optional[str], password: Optional[str], *, onmemory: bool
rects: list[Rect],
user: Optional[str],
password: Optional[str],
*,
onmemory: bool,
chunksize: int = 990,
**kwargs_request,
) -> Optional[list[list]]:
"""
Cut `rects` out of the sky.
Expand All @@ -1023,6 +1037,10 @@ def _download(
onmemory
Return `datalist` on memory.
If `onmemory` is False, downloaded cut-outs are written to files.
chunksize
Number of cutout lines to pack into a single request. Defaults to 990 if unspecified.
kwargs_request
Additional keyword args are passed through to _download_chunk

Returns
-------
Expand Down Expand Up @@ -1058,11 +1076,12 @@ def _download(
if not password:
raise RuntimeError("Password is empty.")

chunksize = 990
datalist: list[tuple[int, dict, bytes]] = []

for i in range(0, len(exploded_rects), chunksize):
ret = _download_chunk(exploded_rects[i : i + chunksize], user, password, onmemory=onmemory)
ret = _download_chunk(
exploded_rects[i : i + chunksize], user, password, onmemory=onmemory, **kwargs_request
)
if onmemory:
datalist += cast(list, ret)

Expand All @@ -1075,7 +1094,15 @@ def _download(


def _download_chunk(
rects: list[tuple[Rect, Any]], user: str, password: str, *, onmemory: bool
rects: list[tuple[Rect, Any]],
user: str,
password: str,
*,
onmemory: bool,
request_hook: Optional[
Callable[[urllib.request.Request, datetime.datetime, datetime.datetime, int, int], Any]
],
**kwargs_request,
) -> Optional[list]:
"""
Cut `rects` out of the sky.
Expand All @@ -1095,6 +1122,11 @@ def _download_chunk(
onmemory
Return `datalist` on memory.
If `onmemory` is False, downloaded cut-outs are written to files.
request_hook
Function that is called with the response of all requests made
Intended to support bandwidth instrumentation.
kwargs_request
Additional keyword args are passed through to urllib.request.urlopen

Returns
-------
Expand Down Expand Up @@ -1134,11 +1166,18 @@ def _download_chunk(

returnedlist = []

# Set timeout to 1 hour if no timout was set higher up
kwargs_request.setdefault("timeout", 3600)

with get_connection_semaphore():
with urllib.request.urlopen(req, timeout=3600) as fin:
request_started = datetime.datetime.now()
with urllib.request.urlopen(req, **kwargs_request) as fin:
response_started = datetime.datetime.now()
response_size = 0
with tarfile.open(fileobj=fin, mode="r|") as tar:
for info in tar:
fitem = tar.extractfile(info)
response_size += info.size
if fitem is None:
continue
with fitem:
Expand All @@ -1162,6 +1201,8 @@ def _download_chunk(
os.makedirs(dirname, exist_ok=True)
with open(filename, "wb") as fout:
_splice(fitem, fout)
if request_hook:
request_hook(req, request_started, response_started, response_size, len(rects))

return returnedlist if onmemory else None

Expand Down
4 changes: 3 additions & 1 deletion src/fibad_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
download_config = {
"sw": "22asec",
"sh": "22asec",
"filter": "all",
"filter": ["HSC-G", "HSC-R", "HSC-I", "HSC-Z", "HSC-Y"],
"type": "coadd",
"rerun": "pdr3_wide",
"username": "mtauraso@local",
"password": "cCw+nX53lmNLHMy+JbizpH/dl4t7sxljiNm6a7k1",
"max_connections": 2,
"fits_file": "../hscplay/temp.fits",
"cutout_dir": "../hscplay/cutouts/",
"offset": 0,
"num_sources": 10,
}

config = {"download": download_config}
Expand Down