-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding bandwith measurement to downloadCutout #9
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
import contextlib | ||
import datetime | ||
import os | ||
import urllib.request | ||
from pathlib import Path | ||
from typing import Union | ||
|
||
|
@@ -20,8 +22,12 @@ | |
def working_directory(path: Path): | ||
""" | ||
Context Manager to change our working directory. | ||
|
||
Supports downloadCutouts which always writes to cwd. | ||
|
||
Parameters | ||
---------- | ||
path : Path | ||
Path that we change `Path.cwd()` while we are active. | ||
""" | ||
old_cwd = Path.cwd() | ||
os.chdir(path) | ||
|
@@ -34,58 +40,85 @@ | |
def run(args, config): | ||
""" | ||
Main entrypoint for downloading cutouts from HSC for use with fibad | ||
|
||
Parameters | ||
---------- | ||
args : list | ||
Command line arguments (unused) | ||
config : dict | ||
Runtime configuration, which is only read by this function | ||
""" | ||
|
||
config = config.get("download", {}) | ||
|
||
print("Download command") | ||
print("Download command Start") | ||
|
||
fits_file = config.get("fits_file", "") | ||
print(f"Reading in fits catalog: {fits_file}") | ||
# Filter the fits file for the fields we want | ||
column_names = ["object_id"] + variable_fields | ||
locations = filterfits(config.get("fits_file"), column_names) | ||
locations = filterfits(fits_file, column_names) | ||
|
||
# Sort by tract, ra, dec to optimize speed that the cutout server can serve us | ||
# | ||
# TODO: See if this sort is performed by downloadCutouts | ||
# It appears downloadCutouts is doing some sorting prior to download, but | ||
# unclear if it is the same sort | ||
locations.sort(variable_fields) | ||
# TODO slice up the locations to multiplex across connections if necessary, but for now | ||
# we simply mask off a few | ||
offset = config.get("offset", 0) | ||
end = offset + config.get("num_sources", 10) | ||
locations = locations[offset:end] | ||
|
||
# TODO slice up the locations | ||
locations = locations[0:10] | ||
|
||
# make a list of rects | ||
# Make a list of rects to pass to downloadCutout | ||
rects = create_rects(locations, offset=0, default=rect_from_config(config)) | ||
|
||
# Configure global parameters for the downloader | ||
dC.set_max_connections(num=config.get("max_connections", 2)) | ||
|
||
print("Requesting cutouts") | ||
# pass the rects to the cutout downloader | ||
download_cutout_group( | ||
rects=rects, cutout_dir=config.get("cutout_dir"), user=config["username"], password=config["password"] | ||
) | ||
|
||
print(locations) | ||
# print(locations) | ||
print("Done") | ||
|
||
|
||
# TODO add error checking | ||
def filterfits(filename: str, column_names: list[str]) -> Table: | ||
""" | ||
Read a fits file with the required column names for making cutouts | ||
"""Read a fits file with the required column names for making cutouts | ||
|
||
Returns an astropy table containing only the necessary fields | ||
|
||
The easiest way to make one of these is to select from the main HSC catalog | ||
|
||
The easiest way to make such a fits file is to select from the main HSC catalog | ||
|
||
Parameters | ||
---------- | ||
filename : str | ||
The fits file to read in | ||
column_names : list[str] | ||
The columns that are filtered out | ||
|
||
Returns | ||
------- | ||
Table | ||
Returns an astropy table containing only the fields specified in column_names | ||
""" | ||
t = Table.read(filename) | ||
columns = [t[column] for column in column_names] | ||
return hstack(columns, uniq_col_name="{table_name}", table_names=column_names) | ||
|
||
|
||
def rect_from_config(config: dict) -> dC.Rect: | ||
""" | ||
Takes our Download config and loads cutout config | ||
"""Takes our runtime config and loads cutout config | ||
common to all cutouts into a prototypical Rect for downloading | ||
|
||
Parameters | ||
---------- | ||
config : dict | ||
Runtime config, only the download section | ||
|
||
Returns | ||
------- | ||
dC.Rect | ||
A single rectangle with fields `sw`, `sh`, `filter`, `rerun`, and `type` populated from the config | ||
""" | ||
return dC.Rect.create( | ||
sw=config["sw"], | ||
|
@@ -97,15 +130,32 @@ | |
|
||
|
||
def create_rects(locations: Table, offset: int = 0, default: dC.Rect = None) -> list[dC.Rect]: | ||
""" | ||
Create the rects we will need to pass to the downloader. | ||
"""Create the rects we will need to pass to the downloader. | ||
One Rect per location in our list of sky locations. | ||
|
||
Rects are created with all fields in the default rect pre-filled | ||
|
||
Offset here is to allow multiple downloads on different sections of the source list | ||
without file name clobbering during the download phase. The offset is intended to be | ||
the index of the start of the locations table within some larger fits file. | ||
|
||
Parameters | ||
---------- | ||
locations : Table | ||
Table containing ra, dec locations in the sky | ||
offset : int, optional | ||
Index to start the `lineno` field in the rects at, by default 0. The purpose of this is to allow | ||
multiple downloads on different sections of a larger source list without file name clobbering during | ||
the download phase. This is important because `lineno` in a rect can becomes a file name parameter | ||
The offset is intended to be the index of the start of the locations table within some larger fits | ||
file. | ||
default : dC.Rect, optional | ||
The default Rect that contains properties common to all sky locations, by default None | ||
|
||
Returns | ||
------- | ||
list[dC.Rect] | ||
Rects populated with sky locations from the table | ||
""" | ||
rects = [] | ||
for index, location in enumerate(locations): | ||
|
@@ -118,11 +168,108 @@ | |
return rects | ||
|
||
|
||
def download_cutout_group(rects: list[dC.Rect], cutout_dir: Union[str, Path], user, password): | ||
stats = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This whole stats subsystem is pretty hacky. I've tried to make it unobtrusive to the CLI, but if we end up using it for anything beyond spot checks, we likely want to go find ourselves a library to do this. |
||
"request_duration": datetime.timedelta(), # Time from request sent to first byte from the server | ||
"response_duration": datetime.timedelta(), # Total time spent recieving and processing a response | ||
"request_size_bytes": 0, # Total size of all requests | ||
"response_size_bytes": 0, # Total size of all responses | ||
"snapshots": 0, # Number of fits snapshots downloaded | ||
} | ||
|
||
|
||
def _stat_accumulate(name: str, value: Union[int, datetime.timedelta]): | ||
"""Accumulate a sum into the global stats dict | ||
|
||
Parameters | ||
---------- | ||
name : str | ||
Name of the stat. Assumed to exist in the dict already. | ||
value : Union[int, datetime.timedelta] | ||
How much time or count to add to the stat | ||
""" | ||
global stats | ||
stats[name] += value | ||
|
||
|
||
def _print_stats(): | ||
"""Print the accumulated stats including bandwidth calculated from duration and sizes | ||
|
||
This prints out multiple lines with `\r` at the end in order to create a continuously updating | ||
line of text during download if your terminal supports it. | ||
""" | ||
global stats | ||
|
||
total_dur_s = (stats["request_duration"] + stats["response_duration"]).total_seconds() | ||
|
||
resp_s = stats["response_duration"].total_seconds() | ||
down_rate_mb_s = (stats["response_size_bytes"] / (1024**2)) / resp_s | ||
|
||
req_s = stats["request_duration"].total_seconds() | ||
up_rate_mb_s = (stats["request_size_bytes"] / (1024**2)) / req_s | ||
|
||
snapshot_rate = stats["snapshots"] / total_dur_s | ||
|
||
print( | ||
f"Stats: Duration: {total_dur_s:.2f} s, Files: {stats['snapshots']}, \ | ||
Upload: {up_rate_mb_s:.2f} MB/s, Download: {down_rate_mb_s:.2f} MB/s File rate: {snapshot_rate:.2f} files/s", | ||
end="\r", | ||
flush=True, | ||
) | ||
|
||
|
||
def request_hook( | ||
request: urllib.request.Request, | ||
request_start: datetime.datetime, | ||
response_start: datetime.datetime, | ||
response_size: int, | ||
chunk_size: int, | ||
): | ||
"""This hook is called on each chunk of snapshots downloaded. | ||
It is called immediately after the server has finished responding to the | ||
request, so datetime.datetime.now() is the end moment of the request | ||
|
||
Parameters | ||
---------- | ||
request : urllib.request.Request | ||
The request object relevant to this call | ||
request_start : datetime.datetime | ||
The moment the request was handed off to urllib.request.urlopen() | ||
response_start : datetime.datetime | ||
The moment there were bytes from the server to process | ||
response_size : int | ||
The size of the response from the server in bytes | ||
chunk_size : int | ||
The number of cutout files recieved in this request | ||
""" | ||
Download cutouts to the given directory | ||
|
||
Calls downloadCutout.download, so supports long lists of rects and | ||
now = datetime.datetime.now() | ||
|
||
_stat_accumulate("request_duration", response_start - request_start) | ||
_stat_accumulate("response_duration", now - response_start) | ||
_stat_accumulate("request_size_bytes", len(request.data)) | ||
_stat_accumulate("response_size_bytes", response_size) | ||
_stat_accumulate("snapshots", chunk_size) | ||
|
||
_print_stats() | ||
|
||
|
||
def download_cutout_group(rects: list[dC.Rect], cutout_dir: Union[str, Path], user, password): | ||
"""Download cutouts to the given directory | ||
|
||
Calls downloadCutout.download, so supports long lists of rects beyond the limits of the HSC API | ||
|
||
Parameters | ||
---------- | ||
rects : list[dC.Rect] | ||
The rects we would like to download | ||
cutout_dir : Union[str, Path] | ||
The directory to put the files | ||
user : _type_ | ||
Username for HSC's download service to use | ||
password : _type_ | ||
Password for HSC's download service to use | ||
""" | ||
with working_directory(Path(cutout_dir)): | ||
dC.download(rects, user=user, password=password, onmemory=False) | ||
dC.download(rects, user=user, password=password, onmemory=True, request_hook=request_hook) | ||
|
||
print("") # Print a newline so the stats stay and look pretty. | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sorting turned out to be unnecessary.
The
Rect
class indownloadCutout.py
has a very particular sort order imposed on it, and the download codepath callsRect.explode()
under the hood, which calls.sort()
on a list ofRect
s particular to each request. This results in the server receiving lists sorted by Tract, RA, DEC and some other fields.