Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding bandwith measurement to downloadCutout #9

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 173 additions & 26 deletions src/fibad/download.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import contextlib
import datetime

Check warning on line 2 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L2

Added line #L2 was not covered by tests
import os
import urllib.request

Check warning on line 4 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L4

Added line #L4 was not covered by tests
from pathlib import Path
from typing import Union

Expand All @@ -20,8 +22,12 @@
def working_directory(path: Path):
"""
Context Manager to change our working directory.

Supports downloadCutouts which always writes to cwd.

Parameters
----------
path : Path
Path that we change `Path.cwd()` while we are active.
"""
old_cwd = Path.cwd()
os.chdir(path)
Expand All @@ -34,58 +40,85 @@
def run(args, config):
"""
Main entrypoint for downloading cutouts from HSC for use with fibad

Parameters
----------
args : list
Command line arguments (unused)
config : dict
Runtime configuration, which is only read by this function
"""

config = config.get("download", {})

print("Download command")
print("Download command Start")

Check warning on line 54 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L54

Added line #L54 was not covered by tests

fits_file = config.get("fits_file", "")
print(f"Reading in fits catalog: {fits_file}")

Check warning on line 57 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L56-L57

Added lines #L56 - L57 were not covered by tests
# Filter the fits file for the fields we want
column_names = ["object_id"] + variable_fields
locations = filterfits(config.get("fits_file"), column_names)
locations = filterfits(fits_file, column_names)

Check warning on line 60 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L60

Added line #L60 was not covered by tests

# Sort by tract, ra, dec to optimize speed that the cutout server can serve us
#
# TODO: See if this sort is performed by downloadCutouts
# It appears downloadCutouts is doing some sorting prior to download, but
# unclear if it is the same sort
locations.sort(variable_fields)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sorting turned out to be unnecessary.

The Rect class in downloadCutout.py has a very particular sort order imposed on it, and the download codepath calls Rect.explode() under the hood, which calls .sort() on a list of Rects particular to each request. This results in the server receiving lists sorted by Tract, RA, DEC and some other fields.

# TODO slice up the locations to multiplex across connections if necessary, but for now
# we simply mask off a few
offset = config.get("offset", 0)
end = offset + config.get("num_sources", 10)
locations = locations[offset:end]

Check warning on line 66 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L64-L66

Added lines #L64 - L66 were not covered by tests

# TODO slice up the locations
locations = locations[0:10]

# make a list of rects
# Make a list of rects to pass to downloadCutout
rects = create_rects(locations, offset=0, default=rect_from_config(config))

# Configure global parameters for the downloader
dC.set_max_connections(num=config.get("max_connections", 2))

print("Requesting cutouts")

Check warning on line 74 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L74

Added line #L74 was not covered by tests
# pass the rects to the cutout downloader
download_cutout_group(
rects=rects, cutout_dir=config.get("cutout_dir"), user=config["username"], password=config["password"]
)

print(locations)
# print(locations)
print("Done")

Check warning on line 81 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L81

Added line #L81 was not covered by tests


# TODO add error checking
def filterfits(filename: str, column_names: list[str]) -> Table:
"""
Read a fits file with the required column names for making cutouts
"""Read a fits file with the required column names for making cutouts

Returns an astropy table containing only the necessary fields

The easiest way to make one of these is to select from the main HSC catalog

The easiest way to make such a fits file is to select from the main HSC catalog

Parameters
----------
filename : str
The fits file to read in
column_names : list[str]
The columns that are filtered out

Returns
-------
Table
Returns an astropy table containing only the fields specified in column_names
"""
t = Table.read(filename)
columns = [t[column] for column in column_names]
return hstack(columns, uniq_col_name="{table_name}", table_names=column_names)


def rect_from_config(config: dict) -> dC.Rect:
"""
Takes our Download config and loads cutout config
"""Takes our runtime config and loads cutout config
common to all cutouts into a prototypical Rect for downloading

Parameters
----------
config : dict
Runtime config, only the download section

Returns
-------
dC.Rect
A single rectangle with fields `sw`, `sh`, `filter`, `rerun`, and `type` populated from the config
"""
return dC.Rect.create(
sw=config["sw"],
Expand All @@ -97,15 +130,32 @@


def create_rects(locations: Table, offset: int = 0, default: dC.Rect = None) -> list[dC.Rect]:
"""
Create the rects we will need to pass to the downloader.
"""Create the rects we will need to pass to the downloader.
One Rect per location in our list of sky locations.

Rects are created with all fields in the default rect pre-filled

Offset here is to allow multiple downloads on different sections of the source list
without file name clobbering during the download phase. The offset is intended to be
the index of the start of the locations table within some larger fits file.

Parameters
----------
locations : Table
Table containing ra, dec locations in the sky
offset : int, optional
Index to start the `lineno` field in the rects at, by default 0. The purpose of this is to allow
multiple downloads on different sections of a larger source list without file name clobbering during
the download phase. This is important because `lineno` in a rect can becomes a file name parameter
The offset is intended to be the index of the start of the locations table within some larger fits
file.
default : dC.Rect, optional
The default Rect that contains properties common to all sky locations, by default None

Returns
-------
list[dC.Rect]
Rects populated with sky locations from the table
"""
rects = []
for index, location in enumerate(locations):
Expand All @@ -118,11 +168,108 @@
return rects


def download_cutout_group(rects: list[dC.Rect], cutout_dir: Union[str, Path], user, password):
stats = {

Check warning on line 171 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L171

Added line #L171 was not covered by tests
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole stats subsystem is pretty hacky. I've tried to make it unobtrusive to the CLI, but if we end up using it for anything beyond spot checks, we likely want to go find ourselves a library to do this.

"request_duration": datetime.timedelta(), # Time from request sent to first byte from the server
"response_duration": datetime.timedelta(), # Total time spent recieving and processing a response
"request_size_bytes": 0, # Total size of all requests
"response_size_bytes": 0, # Total size of all responses
"snapshots": 0, # Number of fits snapshots downloaded
}


def _stat_accumulate(name: str, value: Union[int, datetime.timedelta]):

Check warning on line 180 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L180

Added line #L180 was not covered by tests
"""Accumulate a sum into the global stats dict

Parameters
----------
name : str
Name of the stat. Assumed to exist in the dict already.
value : Union[int, datetime.timedelta]
How much time or count to add to the stat
"""
global stats
stats[name] += value

Check warning on line 191 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L191

Added line #L191 was not covered by tests


def _print_stats():

Check warning on line 194 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L194

Added line #L194 was not covered by tests
"""Print the accumulated stats including bandwidth calculated from duration and sizes

This prints out multiple lines with `\r` at the end in order to create a continuously updating
line of text during download if your terminal supports it.
"""
global stats

total_dur_s = (stats["request_duration"] + stats["response_duration"]).total_seconds()

Check warning on line 202 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L202

Added line #L202 was not covered by tests

resp_s = stats["response_duration"].total_seconds()
down_rate_mb_s = (stats["response_size_bytes"] / (1024**2)) / resp_s

Check warning on line 205 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L204-L205

Added lines #L204 - L205 were not covered by tests

req_s = stats["request_duration"].total_seconds()
up_rate_mb_s = (stats["request_size_bytes"] / (1024**2)) / req_s

Check warning on line 208 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L207-L208

Added lines #L207 - L208 were not covered by tests

snapshot_rate = stats["snapshots"] / total_dur_s

Check warning on line 210 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L210

Added line #L210 was not covered by tests

print(

Check warning on line 212 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L212

Added line #L212 was not covered by tests
f"Stats: Duration: {total_dur_s:.2f} s, Files: {stats['snapshots']}, \
Upload: {up_rate_mb_s:.2f} MB/s, Download: {down_rate_mb_s:.2f} MB/s File rate: {snapshot_rate:.2f} files/s",
end="\r",
flush=True,
)


def request_hook(

Check warning on line 220 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L220

Added line #L220 was not covered by tests
request: urllib.request.Request,
request_start: datetime.datetime,
response_start: datetime.datetime,
response_size: int,
chunk_size: int,
):
"""This hook is called on each chunk of snapshots downloaded.
It is called immediately after the server has finished responding to the
request, so datetime.datetime.now() is the end moment of the request

Parameters
----------
request : urllib.request.Request
The request object relevant to this call
request_start : datetime.datetime
The moment the request was handed off to urllib.request.urlopen()
response_start : datetime.datetime
The moment there were bytes from the server to process
response_size : int
The size of the response from the server in bytes
chunk_size : int
The number of cutout files recieved in this request
"""
Download cutouts to the given directory

Calls downloadCutout.download, so supports long lists of rects and
now = datetime.datetime.now()

Check warning on line 245 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L245

Added line #L245 was not covered by tests

_stat_accumulate("request_duration", response_start - request_start)
_stat_accumulate("response_duration", now - response_start)
_stat_accumulate("request_size_bytes", len(request.data))
_stat_accumulate("response_size_bytes", response_size)
_stat_accumulate("snapshots", chunk_size)

Check warning on line 251 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L247-L251

Added lines #L247 - L251 were not covered by tests

_print_stats()

Check warning on line 253 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L253

Added line #L253 was not covered by tests


def download_cutout_group(rects: list[dC.Rect], cutout_dir: Union[str, Path], user, password):

Check warning on line 256 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L256

Added line #L256 was not covered by tests
"""Download cutouts to the given directory

Calls downloadCutout.download, so supports long lists of rects beyond the limits of the HSC API

Parameters
----------
rects : list[dC.Rect]
The rects we would like to download
cutout_dir : Union[str, Path]
The directory to put the files
user : _type_
Username for HSC's download service to use
password : _type_
Password for HSC's download service to use
"""
with working_directory(Path(cutout_dir)):
dC.download(rects, user=user, password=password, onmemory=False)
dC.download(rects, user=user, password=password, onmemory=True, request_hook=request_hook)

Check warning on line 273 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L273

Added line #L273 was not covered by tests

print("") # Print a newline so the stats stay and look pretty.

Check warning on line 275 in src/fibad/download.py

View check run for this annotation

Codecov / codecov/patch

src/fibad/download.py#L275

Added line #L275 was not covered by tests
Loading