diff --git a/src/fibad/download.py b/src/fibad/download.py index 2a3e83f..e62b043 100644 --- a/src/fibad/download.py +++ b/src/fibad/download.py @@ -1,5 +1,7 @@ import contextlib +import datetime import os +import urllib.request from pathlib import Path from typing import Union @@ -20,8 +22,12 @@ def working_directory(path: Path): """ Context Manager to change our working directory. - Supports downloadCutouts which always writes to cwd. + + Parameters + ---------- + path : Path + Path that we change `Path.cwd()` while we are active. """ old_cwd = Path.cwd() os.chdir(path) @@ -34,48 +40,66 @@ def working_directory(path: Path): def run(args, config): """ Main entrypoint for downloading cutouts from HSC for use with fibad + + Parameters + ---------- + args : list + Command line arguments (unused) + config : dict + Runtime configuration, which is only read by this function """ config = config.get("download", {}) - print("Download command") + print("Download command Start") + fits_file = config.get("fits_file", "") + print(f"Reading in fits catalog: {fits_file}") # Filter the fits file for the fields we want column_names = ["object_id"] + variable_fields - locations = filterfits(config.get("fits_file"), column_names) + locations = filterfits(fits_file, column_names) - # Sort by tract, ra, dec to optimize speed that the cutout server can serve us - # - # TODO: See if this sort is performed by downloadCutouts - # It appears downloadCutouts is doing some sorting prior to download, but - # unclear if it is the same sort - locations.sort(variable_fields) + # TODO slice up the locations to multiplex across connections if necessary, but for now + # we simply mask off a few + offset = config.get("offset", 0) + end = offset + config.get("num_sources", 10) + locations = locations[offset:end] - # TODO slice up the locations - locations = locations[0:10] - - # make a list of rects + # Make a list of rects to pass to downloadCutout rects = create_rects(locations, offset=0, default=rect_from_config(config)) # Configure global parameters for the downloader dC.set_max_connections(num=config.get("max_connections", 2)) + print("Requesting cutouts") # pass the rects to the cutout downloader download_cutout_group( rects=rects, cutout_dir=config.get("cutout_dir"), user=config["username"], password=config["password"] ) - print(locations) + # print(locations) + print("Done") # TODO add error checking def filterfits(filename: str, column_names: list[str]) -> Table: - """ - Read a fits file with the required column names for making cutouts + """Read a fits file with the required column names for making cutouts - Returns an astropy table containing only the necessary fields - The easiest way to make one of these is to select from the main HSC catalog + + The easiest way to make such a fits file is to select from the main HSC catalog + + Parameters + ---------- + filename : str + The fits file to read in + column_names : list[str] + The columns that are filtered out + + Returns + ------- + Table + Returns an astropy table containing only the fields specified in column_names """ t = Table.read(filename) columns = [t[column] for column in column_names] @@ -83,9 +107,18 @@ def filterfits(filename: str, column_names: list[str]) -> Table: def rect_from_config(config: dict) -> dC.Rect: - """ - Takes our Download config and loads cutout config + """Takes our runtime config and loads cutout config common to all cutouts into a prototypical Rect for downloading + + Parameters + ---------- + config : dict + Runtime config, only the download section + + Returns + ------- + dC.Rect + A single rectangle with fields `sw`, `sh`, `filter`, `rerun`, and `type` populated from the config """ return dC.Rect.create( sw=config["sw"], @@ -97,8 +130,7 @@ def rect_from_config(config: dict) -> dC.Rect: def create_rects(locations: Table, offset: int = 0, default: dC.Rect = None) -> list[dC.Rect]: - """ - Create the rects we will need to pass to the downloader. + """Create the rects we will need to pass to the downloader. One Rect per location in our list of sky locations. Rects are created with all fields in the default rect pre-filled @@ -106,6 +138,24 @@ def create_rects(locations: Table, offset: int = 0, default: dC.Rect = None) -> Offset here is to allow multiple downloads on different sections of the source list without file name clobbering during the download phase. The offset is intended to be the index of the start of the locations table within some larger fits file. + + Parameters + ---------- + locations : Table + Table containing ra, dec locations in the sky + offset : int, optional + Index to start the `lineno` field in the rects at, by default 0. The purpose of this is to allow + multiple downloads on different sections of a larger source list without file name clobbering during + the download phase. This is important because `lineno` in a rect can becomes a file name parameter + The offset is intended to be the index of the start of the locations table within some larger fits + file. + default : dC.Rect, optional + The default Rect that contains properties common to all sky locations, by default None + + Returns + ------- + list[dC.Rect] + Rects populated with sky locations from the table """ rects = [] for index, location in enumerate(locations): @@ -118,11 +168,108 @@ def create_rects(locations: Table, offset: int = 0, default: dC.Rect = None) -> return rects -def download_cutout_group(rects: list[dC.Rect], cutout_dir: Union[str, Path], user, password): +stats = { + "request_duration": datetime.timedelta(), # Time from request sent to first byte from the server + "response_duration": datetime.timedelta(), # Total time spent recieving and processing a response + "request_size_bytes": 0, # Total size of all requests + "response_size_bytes": 0, # Total size of all responses + "snapshots": 0, # Number of fits snapshots downloaded +} + + +def _stat_accumulate(name: str, value: Union[int, datetime.timedelta]): + """Accumulate a sum into the global stats dict + + Parameters + ---------- + name : str + Name of the stat. Assumed to exist in the dict already. + value : Union[int, datetime.timedelta] + How much time or count to add to the stat + """ + global stats + stats[name] += value + + +def _print_stats(): + """Print the accumulated stats including bandwidth calculated from duration and sizes + + This prints out multiple lines with `\r` at the end in order to create a continuously updating + line of text during download if your terminal supports it. + """ + global stats + + total_dur_s = (stats["request_duration"] + stats["response_duration"]).total_seconds() + + resp_s = stats["response_duration"].total_seconds() + down_rate_mb_s = (stats["response_size_bytes"] / (1024**2)) / resp_s + + req_s = stats["request_duration"].total_seconds() + up_rate_mb_s = (stats["request_size_bytes"] / (1024**2)) / req_s + + snapshot_rate = stats["snapshots"] / total_dur_s + + print( + f"Stats: Duration: {total_dur_s:.2f} s, Files: {stats['snapshots']}, \ +Upload: {up_rate_mb_s:.2f} MB/s, Download: {down_rate_mb_s:.2f} MB/s File rate: {snapshot_rate:.2f} files/s", + end="\r", + flush=True, + ) + + +def request_hook( + request: urllib.request.Request, + request_start: datetime.datetime, + response_start: datetime.datetime, + response_size: int, + chunk_size: int, +): + """This hook is called on each chunk of snapshots downloaded. + It is called immediately after the server has finished responding to the + request, so datetime.datetime.now() is the end moment of the request + + Parameters + ---------- + request : urllib.request.Request + The request object relevant to this call + request_start : datetime.datetime + The moment the request was handed off to urllib.request.urlopen() + response_start : datetime.datetime + The moment there were bytes from the server to process + response_size : int + The size of the response from the server in bytes + chunk_size : int + The number of cutout files recieved in this request """ - Download cutouts to the given directory - Calls downloadCutout.download, so supports long lists of rects and + now = datetime.datetime.now() + + _stat_accumulate("request_duration", response_start - request_start) + _stat_accumulate("response_duration", now - response_start) + _stat_accumulate("request_size_bytes", len(request.data)) + _stat_accumulate("response_size_bytes", response_size) + _stat_accumulate("snapshots", chunk_size) + + _print_stats() + + +def download_cutout_group(rects: list[dC.Rect], cutout_dir: Union[str, Path], user, password): + """Download cutouts to the given directory + + Calls downloadCutout.download, so supports long lists of rects beyond the limits of the HSC API + + Parameters + ---------- + rects : list[dC.Rect] + The rects we would like to download + cutout_dir : Union[str, Path] + The directory to put the files + user : _type_ + Username for HSC's download service to use + password : _type_ + Password for HSC's download service to use """ with working_directory(Path(cutout_dir)): - dC.download(rects, user=user, password=password, onmemory=False) + dC.download(rects, user=user, password=password, onmemory=True, request_hook=request_hook) + + print("") # Print a newline so the stats stay and look pretty. diff --git a/src/fibad/downloadCutout/downloadCutout.py b/src/fibad/downloadCutout/downloadCutout.py index 889c744..84f872a 100644 --- a/src/fibad/downloadCutout/downloadCutout.py +++ b/src/fibad/downloadCutout/downloadCutout.py @@ -4,6 +4,7 @@ import contextlib import csv import dataclasses +import datetime import errno import getpass import io @@ -15,6 +16,7 @@ import tempfile import time import urllib.request +import urllib.response from collections.abc import Generator from typing import IO, Any, Callable, Optional, Union, cast @@ -323,7 +325,7 @@ class Rect: rerun: str = default_rerun type: str = default_type - filter: str = ALLFILTERS + filter: Union[str, list[str]] = ALLFILTERS tract: int = ANYTRACT ra: float = math.nan dec: float = math.nan @@ -339,7 +341,7 @@ class Rect: def create( rerun: Union[str, None] = None, type: Union[str, None] = None, - filter: Union[str, None] = None, + filter: Union[Union[str, list[str]], None] = None, tract: Union[str, int, None] = None, ra: Union[str, float, None] = None, dec: Union[str, float, None] = None, @@ -460,7 +462,9 @@ def explode(self) -> list["Rect"]: rects List of `Rect` objects, each being more specific than `self`. """ - if self.filter == ALLFILTERS: + if isinstance(self.filter, list): + return [Rect.create(filter=f, default=self) for f in self.filter] + elif self.filter == ALLFILTERS: return [Rect.create(filter=f, default=self) for f in _all_filters] else: return [Rect.create(default=self)] @@ -927,7 +931,7 @@ def parse_filter(s: str) -> str: ) -def parse_filter_opt(s: Optional[str]) -> str: +def parse_filter_opt(s: Optional[Union[str, list]]) -> Union[str, list]: """ Interpret a filter name. The argument may be `ALLFILTERS`.or None @@ -950,7 +954,8 @@ def parse_filter_opt(s: Optional[str]) -> str: """ if s is None: return ALLFILTERS - + if isinstance(s, list): + return [parse_filter(filter) for filter in s] if s.lower() == ALLFILTERS: return ALLFILTERS return parse_filter(s) @@ -963,6 +968,7 @@ def download( password: Optional[str] = None, *, onmemory: bool = True, + **kwargs_request, ) -> Union[list, list[list], None]: """ Cut `rects` out of the sky. @@ -978,6 +984,8 @@ def download( onmemory Return `datalist` on memory. If `onmemory` is False, downloaded cut-outs are written to files. + kwargs_request + Additional keyword args are passed through to _download Returns ------- @@ -998,7 +1006,7 @@ def download( rects = [cast(Rect, rects)] rects = cast(list[Rect], rects) - ret = _download(rects, user, password, onmemory=onmemory) + ret = _download(rects, user, password, onmemory=onmemory, **kwargs_request) if isscalar and onmemory: ret = cast(list[list], ret) return ret[0] @@ -1007,7 +1015,13 @@ def download( def _download( - rects: list[Rect], user: Optional[str], password: Optional[str], *, onmemory: bool + rects: list[Rect], + user: Optional[str], + password: Optional[str], + *, + onmemory: bool, + chunksize: int = 990, + **kwargs_request, ) -> Optional[list[list]]: """ Cut `rects` out of the sky. @@ -1023,6 +1037,10 @@ def _download( onmemory Return `datalist` on memory. If `onmemory` is False, downloaded cut-outs are written to files. + chunksize + Number of cutout lines to pack into a single request. Defaults to 990 if unspecified. + kwargs_request + Additional keyword args are passed through to _download_chunk Returns ------- @@ -1058,11 +1076,12 @@ def _download( if not password: raise RuntimeError("Password is empty.") - chunksize = 990 datalist: list[tuple[int, dict, bytes]] = [] for i in range(0, len(exploded_rects), chunksize): - ret = _download_chunk(exploded_rects[i : i + chunksize], user, password, onmemory=onmemory) + ret = _download_chunk( + exploded_rects[i : i + chunksize], user, password, onmemory=onmemory, **kwargs_request + ) if onmemory: datalist += cast(list, ret) @@ -1075,7 +1094,15 @@ def _download( def _download_chunk( - rects: list[tuple[Rect, Any]], user: str, password: str, *, onmemory: bool + rects: list[tuple[Rect, Any]], + user: str, + password: str, + *, + onmemory: bool, + request_hook: Optional[ + Callable[[urllib.request.Request, datetime.datetime, datetime.datetime, int, int], Any] + ], + **kwargs_request, ) -> Optional[list]: """ Cut `rects` out of the sky. @@ -1095,6 +1122,11 @@ def _download_chunk( onmemory Return `datalist` on memory. If `onmemory` is False, downloaded cut-outs are written to files. + request_hook + Function that is called with the response of all requests made + Intended to support bandwidth instrumentation. + kwargs_request + Additional keyword args are passed through to urllib.request.urlopen Returns ------- @@ -1134,11 +1166,18 @@ def _download_chunk( returnedlist = [] + # Set timeout to 1 hour if no timout was set higher up + kwargs_request.setdefault("timeout", 3600) + with get_connection_semaphore(): - with urllib.request.urlopen(req, timeout=3600) as fin: + request_started = datetime.datetime.now() + with urllib.request.urlopen(req, **kwargs_request) as fin: + response_started = datetime.datetime.now() + response_size = 0 with tarfile.open(fileobj=fin, mode="r|") as tar: for info in tar: fitem = tar.extractfile(info) + response_size += info.size if fitem is None: continue with fitem: @@ -1162,6 +1201,8 @@ def _download_chunk( os.makedirs(dirname, exist_ok=True) with open(filename, "wb") as fout: _splice(fitem, fout) + if request_hook: + request_hook(req, request_started, response_started, response_size, len(rects)) return returnedlist if onmemory else None diff --git a/src/fibad_cli/main.py b/src/fibad_cli/main.py index 5816003..1aa3136 100644 --- a/src/fibad_cli/main.py +++ b/src/fibad_cli/main.py @@ -8,7 +8,7 @@ download_config = { "sw": "22asec", "sh": "22asec", - "filter": "all", + "filter": ["HSC-G", "HSC-R", "HSC-I", "HSC-Z", "HSC-Y"], "type": "coadd", "rerun": "pdr3_wide", "username": "mtauraso@local", @@ -16,6 +16,8 @@ "max_connections": 2, "fits_file": "../hscplay/temp.fits", "cutout_dir": "../hscplay/cutouts/", + "offset": 0, + "num_sources": 10, } config = {"download": download_config}