From 069631ed6dd165f8a8f6304d68dbf84d57bc040f Mon Sep 17 00:00:00 2001 From: pkdash Date: Thu, 23 Mar 2023 22:18:20 -0400 Subject: [PATCH] [#44] adding aggregation type classes for data object support --- hsclient/hydroshare.py | 480 +++++++++++++++++++++++------------------ 1 file changed, 270 insertions(+), 210 deletions(-) diff --git a/hsclient/hydroshare.py b/hsclient/hydroshare.py index 18a490e..e562400 100644 --- a/hsclient/hydroshare.py +++ b/hsclient/hydroshare.py @@ -6,13 +6,14 @@ import sqlite3 import tempfile import time -from contextlib import closing import urllib.parse +from concurrent.futures import ThreadPoolExecutor +from contextlib import closing from datetime import datetime from functools import wraps from posixpath import basename, dirname, join as urljoin, splitext from pprint import pformat -from typing import Dict, List, Union, TYPE_CHECKING, Callable +from typing import Callable, Dict, List, TYPE_CHECKING, Union from urllib.parse import quote, unquote, urlparse from zipfile import ZipFile @@ -134,7 +135,7 @@ def __init__(self, map_path, hs_session, checksums=None): self._parsed_files = None self._parsed_aggregations = None self._parsed_checksums = checksums - self._data_object = None + self._main_file_path = None def __str__(self): return self._map_path @@ -177,11 +178,40 @@ def _files(self): @property def _aggregations(self): + + def populate_files(_aggr): + _aggr._files + + def populate_metadata(_aggr): + _aggr._metadata + if not self._parsed_aggregations: self._parsed_aggregations = [] for file in self._map.describes.files: if is_aggregation(str(file)): self._parsed_aggregations.append(Aggregation(unquote(file.path), self._hs_session, self._checksums)) + + # load files (instances of File) and metadata for all aggregations + with ThreadPoolExecutor() as executor: + executor.map(populate_files, self._parsed_aggregations) + executor.map(populate_metadata, self._parsed_aggregations) + + # convert aggregations to aggregation type supporting data object + aggregations_copy = self._parsed_aggregations[:] + typed_aggregation_classes = {AggregationType.MultidimensionalAggregation: NetCDFAggregation, + AggregationType.TimeSeriesAggregation: TimeseriesAggregation, + AggregationType.GeographicRasterAggregation: GeoRasterAggregation, + AggregationType.GeographicFeatureAggregation: GeoFeatureAggregation, + } + for aggr in aggregations_copy: + typed_aggr = None + typed_aggr_cls = typed_aggregation_classes.get(aggr.metadata.type, None) + if typed_aggr_cls: + typed_aggr = typed_aggr_cls.create(base_aggr=aggr) + if typed_aggr: + self._parsed_aggregations.remove(aggr) + self._parsed_aggregations.append(typed_aggr) + return self._parsed_aggregations @property @@ -232,59 +262,6 @@ def _download(self, save_path: str = "", unzip_to: str = None) -> str: return unzip_to return downloaded_zip - def _validate_aggregation_path(self, agg_path: str, for_save_data: bool = False) -> str: - main_file_ext = pathlib.Path(self.main_file_path).suffix - file_name = self.file(extension=main_file_ext).name - file_path = urljoin(agg_path, file_name) - if not os.path.exists(file_path) or not os.path.isfile(file_path): - file_path = urljoin(file_path, file_name) - if not os.path.exists(file_path): - raise Exception(f"Aggregation was not found at: {agg_path}") - - if for_save_data: - if self.metadata.type == AggregationType.GeographicFeatureAggregation: - if file_path == self._data_object.path: - raise Exception(f"Aggregation path '{agg_path}' is not a valid path. This should be a path where " - f"you have the updated shape files") - else: - for aggr_file in self.files(): - aggr_file = basename(aggr_file) - if aggr_file.endswith(".shp.xml") or aggr_file.endswith(".sbn") or aggr_file.endswith(".sbx"): - # these are optional files for geo feature aggregation - continue - if not os.path.exists(os.path.join(agg_path, aggr_file)): - raise Exception(f"Aggregation path '{agg_path}' is not a valid path. " - f"Missing file '{aggr_file}'") - return file_path - - def _get_data_object(self, agg_path: str, func: Callable) -> \ - Union['pandas.DataFrame', 'fiona.Collection', 'rasterio.DatasetReader', 'xarray.Dataset']: - - if self._data_object is not None and self.metadata.type != AggregationType.TimeSeriesAggregation: - return self._data_object - - file_path = self._validate_aggregation_path(agg_path) - data_object = func(file_path) - if self.metadata.type == AggregationType.MultidimensionalAggregation: - data_object.load() - data_object.close() - - # cache the data object for the aggregation - self._data_object = data_object - return data_object - - def _validate_aggregation_for_update(self, resource: 'Resource', agg_type: AggregationType) -> None: - if self.metadata.type != agg_type: - raise Exception(f"Not a {agg_type.value} aggregation") - - if self._data_object is None: - raise Exception("No data object exists for this aggregation.") - - # check this aggregation is part of the specified resource - aggr = resource.aggregation(file__path=self.main_file_path) - if aggr is None: - raise Exception("This aggregation is not part of the specified resource.") - @property def metadata_file(self): """The path to the metadata file""" @@ -303,19 +280,19 @@ def metadata_path(self) -> str: @property def main_file_path(self) -> str: """The path to the main file in the aggregation""" + if self._main_file_path is not None: + return self._main_file_path mft = main_file_type(self.metadata.type) if mft: for file in self.files(): if str(file).endswith(mft): - return file.path + self._main_file_path = file.path + return self._main_file_path if self.metadata.type == AggregationType.FileSetAggregation: - return self.files()[0].folder - return self.files()[0].path - - @property - def data_object(self) -> \ - Union['pandas.DataFrame', 'fiona.Collection', 'rasterio.DatasetReader', 'xarray.Dataset', None]: - return self._data_object + self._main_file_path = self.files()[0].folder + return self._main_file_path + self._main_file_path = self.files()[0].path + return self._main_file_path @refresh def save(self) -> None: @@ -364,6 +341,17 @@ def aggregations(self, **kwargs) -> List[BaseMetadata]: :return: a List of Aggregation objects matching the filter parameters """ aggregations = self._aggregations + + # when searching using 'file__path' or files__path' as the key, there can be only one matching aggregation + file_path = kwargs.get("file__path", "") + if not file_path: + file_path = kwargs.get("files__path", "") + if file_path: + for agg in aggregations: + if agg.files(path=file_path): + return [agg] + return [] + for key, value in kwargs.items(): if key.startswith('file__'): file_args = {key[len('file__'):]: value} @@ -399,7 +387,7 @@ def refresh(self) -> None: self._parsed_files = None self._parsed_aggregations = None self._parsed_checksums = None - self._data_object = None + self._main_file_path = None def delete(self) -> None: """Deletes this aggregation from HydroShare""" @@ -413,121 +401,89 @@ def delete(self) -> None: self._hs_session.delete(path, status_code=200) self.refresh() - def as_series(self, series_id: str, agg_path: str) -> 'pandas.DataFrame': - """ - Creates a pandas DataFrame object out of an aggregation of type TimeSeries. - :param series_id: The series_id of the timeseries result to be converted to a Dataframe object. - :param agg_path: The local path where this aggregation has been downloaded previously. - :return: A pandas.DataFrame object - """ - # TODO: if we decide that the user will prefer to use `as_data_object` method rather than this method, then - # make this method as a private method. - if pandas is None: - raise Exception("pandas package not found") +class DataObjectSupportingAggregation(Aggregation): + """Base class for any aggregation supporting aggregation type specific data analysis object (e.g. pandas)""" - def to_series(timeseries_file: str): - con = sqlite3.connect(timeseries_file) - return pandas.read_sql( - f'SELECT * FROM TimeSeriesResultValues WHERE ResultID IN ' - f'(SELECT ResultID FROM Results WHERE ResultUUID = "{series_id}");', - con, - ).squeeze() + @staticmethod + def create(aggr_cls, base_aggr): + """creates a type specific aggregation object from an instance of Aggregation""" + aggr = aggr_cls(base_aggr._map_path, base_aggr._hs_session, base_aggr._parsed_checksums) + aggr._retrieved_map = base_aggr._retrieved_map + aggr._retrieved_metadata = base_aggr._retrieved_metadata + aggr._parsed_files = base_aggr._parsed_files + aggr._parsed_aggregations = base_aggr._parsed_aggregations + aggr._main_file_path = base_aggr._main_file_path + aggr._data_object = None + return aggr - return self._get_data_object(agg_path=agg_path, func=to_series) + def refresh(self) -> None: + super().refresh() + self._data_object = None - def as_multi_dimensional_dataset(self, agg_path: str) -> 'xarray.Dataset': - """ - Creates a xarray Dataset object out of an aggregation of type NetCDF. - :param agg_path: The local path where this aggregation has been downloaded previously. - :return: A xarray.Dataset object - """ - # TODO: if we decide that the user will prefer to use `as_data_object` method rather than this method, then - # make this method as a private method. + @property + def data_object(self) -> \ + Union['pandas.DataFrame', 'fiona.Collection', 'rasterio.DatasetReader', 'xarray.Dataset', None]: + return self._data_object - if self.metadata.type != AggregationType.MultidimensionalAggregation: - raise Exception("Aggregation is not of type NetCDF") - if xarray is None: - raise Exception("xarray package was not found") + def _get_file_path(self, agg_path): + main_file_ext = pathlib.Path(self.main_file_path).suffix + file_name = self.file(extension=main_file_ext).name + file_path = urljoin(agg_path, file_name) + if not os.path.exists(file_path) or not os.path.isfile(file_path): + file_path = urljoin(file_path, file_name) + if not os.path.exists(file_path): + raise Exception(f"Aggregation was not found at: {agg_path}") + return file_path - return self._get_data_object(agg_path=agg_path, func=xarray.open_dataset) + def _validate_aggregation_path(self, agg_path: str, for_save_data: bool = False) -> str: + return self._get_file_path(agg_path) - def as_feature_collection(self, agg_path: str) -> 'fiona.Collection': - """ - Creates a fiona Collection object out of an aggregation of type GeoFeature. - :param agg_path: The local path where this aggregation has been downloaded previously. - :return: A fiona.Collection object - Note: The caller is responsible for closing the fiona.Collection object to free up aggregation files used to - create this object. - """ - # TODO: if we decide that the user will prefer to use `as_data_object` method rather than this method, then - # make this method as a private method. + def _get_data_object(self, agg_path: str, func: Callable) -> \ + Union['pandas.DataFrame', 'fiona.Collection', 'rasterio.DatasetReader', 'xarray.Dataset']: - if self.metadata.type != AggregationType.GeographicFeatureAggregation: - raise Exception("Aggregation is not of type GeoFeature") - if fiona is None: - raise Exception("fiona package was not found") - return self._get_data_object(agg_path=agg_path, func=fiona.open) + if self._data_object is not None and self.metadata.type != AggregationType.TimeSeriesAggregation: + return self._data_object - def as_raster_dataset(self, agg_path: str) -> 'rasterio.DatasetReader': - """ - Creates a rasterio DatasetReader object out of an aggregation of type GeoRaster - :param agg_path: The local path where this aggregation has been downloaded previously. - :return: A rasterio.DatasetReader object - Note: The caller is responsible for closing the rasterio.DatasetReader object to free up aggregation files - used to create this object. - """ - # TODO: if we decide that the user will prefer to use `as_data_object` method rather than this method, then - # make this method as a private method. + file_path = self._validate_aggregation_path(agg_path) + data_object = func(file_path) + if self.metadata.type == AggregationType.MultidimensionalAggregation: + data_object.load() + data_object.close() - if self.metadata.type != AggregationType.GeographicRasterAggregation: - raise Exception("Aggregation is not of type GeoRaster") - if rasterio is None: - raise Exception("rasterio package was not found") + # cache the data object for the aggregation + self._data_object = data_object + return data_object - return self._get_data_object(agg_path=agg_path, func=rasterio.open) + def _validate_aggregation_for_update(self, resource: 'Resource', agg_type: AggregationType) -> None: + if self.metadata.type != agg_type: + raise Exception(f"Not a {agg_type.value} aggregation") - def as_data_object(self, agg_path: str, series_id: str = "") -> \ - Union['pandas.DataFrame', 'fiona.Collection', 'rasterio.DatasetReader', 'xarray.Dataset']: - """ - Loads aggregation data to a relevant data object type. Data for a timeseries aggregation is loaded as pandas - DataFrame, data for a geo feature aggregation os loaded as a fiona Collection object, data for a raster - aggregation is loaded as rasterio DatasetReader object, and data for a netcdf aggregation is loaded as xarray - Dataset object. - :param agg_path: The local path where this aggregation has been downloaded previously. - :param series_id: The series_id of the timeseries result to be converted to a Dataframe object. A value for this - parameter is required only for a timeseries aggregation. - """ + if self._data_object is None: + raise Exception("No data object exists for this aggregation.") - if self.metadata.type == AggregationType.TimeSeriesAggregation: - if not series_id: - raise Exception("Provide the series_id for which the timeseries data object is needed.") - return self.as_series(series_id=series_id, agg_path=agg_path) - if self.metadata.type == AggregationType.MultidimensionalAggregation: - return self.as_multi_dimensional_dataset(agg_path=agg_path) - if self.metadata.type == AggregationType.GeographicFeatureAggregation: - return self.as_feature_collection(agg_path=agg_path) - if self.metadata.type == AggregationType.GeographicRasterAggregation: - return self.as_raster_dataset(agg_path=agg_path) + # check this aggregation is part of the specified resource + aggr = resource.aggregation(file__path=self.main_file_path) + if aggr is None: + raise Exception("This aggregation is not part of the specified resource.") - raise Exception(f"Data object is not supported for '{self.metadata.type}' aggregation type") - def update_netcdf_data(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False, - destination_path: str = "") -> 'Aggregation': - """ - Updates the netcdf file associated with this aggregation. Then uploads the updated netcdf file - to create a new aggregation that replaces the original aggregation. - :param resource: The resource object to which this aggregation belongs. - :param agg_path: The local path where this aggregation has been downloaded previously. - :param as_new_aggr: If True a new aggregation will be created, otherwise this aggregation will be - updated/replaced. - :param destination_path: The destination folder path where the new aggregation will be created. This folder - path must already exist in resource. This parameter is used only when 'as_new_aggr' is True. - :return: The updated netcdf aggregation or a new netcdf aggregation (an instance of Aggregation) - """ +class NetCDFAggregation(DataObjectSupportingAggregation): + + @classmethod + def create(cls, base_aggr): + return super().create(aggr_cls=cls, base_aggr=base_aggr) + + def as_data_object(self, agg_path: str) -> 'xarray.Dataset': + if self.metadata.type != AggregationType.MultidimensionalAggregation: + raise Exception("Aggregation is not of type NetCDF") + if xarray is None: + raise Exception("xarray package was not found") - # TODO: if we decide that the user will prefer to use `save_data_object` rather than this method, then - # make this method as a private method. + return self._get_data_object(agg_path=agg_path, func=xarray.open_dataset) + + def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False, + destination_path: str = "") -> 'Aggregation': self._validate_aggregation_for_update(resource, AggregationType.MultidimensionalAggregation) file_path = self._validate_aggregation_path(agg_path, for_save_data=True) @@ -567,23 +523,29 @@ def update_netcdf_data(self, resource: 'Resource', agg_path: str, as_new_aggr: b aggr._data_object = data_object return aggr - def update_timeseries_data(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False, - destination_path: str = "") -> 'Aggregation': - """ - Updates the sqlite file associated with this aggregation. Then uploads the updated sqlite file - to create a new aggregation that either replaces the original aggregation or adds as a new - aggregation. - :param resource: The resource object to which this aggregation belongs. - :param agg_path: The local path where this aggregation has been downloaded previously. - :param as_new_aggr: If True a new aggregation will be created, otherwise this aggregation will be - updated/replaced. - :param destination_path: The destination folder path where the new aggregation will be created. This folder - path must already exist in resource. This parameter is used only when 'as_new_aggr' is True. - :return: The updated timeseries aggregation or a new timeseries aggregation (an instance of Aggregation) - """ - # TODO: if we decide that the user will prefer to use `save_data_object` rather than this method, then - # make this method as a private method. +class TimeseriesAggregation(DataObjectSupportingAggregation): + + @classmethod + def create(cls, base_aggr): + return super().create(aggr_cls=cls, base_aggr=base_aggr) + + def as_data_object(self, agg_path: str, series_id: str = "") -> 'pandas.DataFrame': + if pandas is None: + raise Exception("pandas package not found") + + def to_series(timeseries_file: str): + con = sqlite3.connect(timeseries_file) + return pandas.read_sql( + f'SELECT * FROM TimeSeriesResultValues WHERE ResultID IN ' + f'(SELECT ResultID FROM Results WHERE ResultUUID = "{series_id}");', + con, + ).squeeze() + + return self._get_data_object(agg_path=agg_path, func=to_series) + + def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False, + destination_path: str = "") -> 'Aggregation': self._validate_aggregation_for_update(resource, AggregationType.TimeSeriesAggregation) file_path = self._validate_aggregation_path(agg_path, for_save_data=True) @@ -637,24 +599,33 @@ def update_timeseries_data(self, resource: 'Resource', agg_path: str, as_new_agg aggr._data_object = data_object return aggr - def update_geo_feature_data(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False, - destination_path: str = "") -> 'Aggregation': - """ - Updates the shape files associated with this aggregation. Then uploads all files associated with this - aggregation to create a new aggregation that either replaces the original aggregation or adds as a new - aggregation. - :param resource: The resource object to which this aggregation belongs. - :param agg_path: The local path where this aggregation has been downloaded previously. - :param as_new_aggr: If True a new aggregation will be created, otherwise this aggregation will be - updated/replaced. - :param destination_path: The destination folder path where the new aggregation will be created. This folder - path must already exist in resource. This parameter is used only when 'as_new_aggr' is True. - :return: The updated geo-feature aggregation or a new geo-feature aggregation (an instance of Aggregation) - """ - # TODO: if we decide that the user will prefer to use `save_data_object` rather than this method, then - # make this method as a private method. +class GeoFeatureAggregation(DataObjectSupportingAggregation): + + @classmethod + def create(cls, base_aggr): + return super().create(aggr_cls=cls, base_aggr=base_aggr) + + def _validate_aggregation_path(self, agg_path: str, for_save_data: bool = False) -> str: + if for_save_data: + for aggr_file in self.files(): + aggr_file = basename(aggr_file) + if aggr_file.endswith(".shp.xml") or aggr_file.endswith(".sbn") or aggr_file.endswith(".sbx"): + # these are optional files for geo feature aggregation + continue + if not os.path.exists(os.path.join(agg_path, aggr_file)): + raise Exception(f"Aggregation path '{agg_path}' is not a valid path. " + f"Missing file '{aggr_file}'") + file_path = self._get_file_path(agg_path) + return file_path + + def as_data_object(self, agg_path: str) -> 'fiona.Collection': + if fiona is None: + raise Exception("fiona package was not found") + return self._get_data_object(agg_path=agg_path, func=fiona.open) + def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False, + destination_path: str = "") -> 'Aggregation': def upload_shape_files(main_file_path, dst_path=""): shp_file_dir_path = os.path.dirname(main_file_path) filename_starts_with = f"{pathlib.Path(main_file_path).stem}." @@ -726,24 +697,115 @@ def upload_shape_files(main_file_path, dst_path=""): aggr._data_object = data_object return aggr + +class GeoRasterAggregation(DataObjectSupportingAggregation): + + @classmethod + def create(cls, base_aggr): + return super().create(aggr_cls=cls, base_aggr=base_aggr) + + def _validate_aggregation_path(self, agg_path: str, for_save_data: bool = False) -> str: + if for_save_data: + tif_file_count = 0 + vrt_file_count = 0 + tif_file_path = "" + vrt_file_path = "" + for item in os.listdir(agg_path): + item_full_path = os.path.join(agg_path, item) + if os.path.isfile(item_full_path): + file_ext = pathlib.Path(item_full_path).suffix.lower() + if file_ext in (".tif", ".tiff"): + tif_file_count += 1 + tif_file_path = item_full_path + elif file_ext == '.vrt': + vrt_file_path = item_full_path + vrt_file_count += 1 + if vrt_file_count > 1: + raise Exception(f"Aggregation path '{agg_path}' is not a valid path. " + f"More than one vrt was file found") + else: + raise Exception(f"Aggregation path '{agg_path}' is not a valid path. " + f"There are files that are not of raster file types") + if tif_file_count == 0: + raise Exception(f"Aggregation path '{agg_path}' is not a valid path. " + f"No tif file was found") + if tif_file_count > 1 and vrt_file_count == 0: + raise Exception(f"Aggregation path '{agg_path}' is not a valid path. " + f"Missing a vrt file") + if vrt_file_path: + file_path = vrt_file_path + else: + file_path = tif_file_path + else: + file_path = self._get_file_path(agg_path) + + return file_path + + def as_data_object(self, agg_path: str) -> 'rasterio.DatasetReader': + if rasterio is None: + raise Exception("rasterio package was not found") + + return self._get_data_object(agg_path=agg_path, func=rasterio.open) + def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False, destination_path: str = "") -> 'Aggregation': - """ - Updates the data file(s) of this aggregation using the associated data processing object - and either updates this aggregation or creates a new aggregation using the updated data files. - """ - if self.metadata.type == AggregationType.MultidimensionalAggregation: - return self.update_netcdf_data(resource, agg_path, as_new_aggr, destination_path) + def upload_raster_files(dst_path=""): + raster_files = [] + for item in os.listdir(agg_path): + item_full_path = os.path.join(agg_path, item) + if os.path.isfile(item_full_path): + raster_files.append(item_full_path) + resource.file_upload(*raster_files, destination_path=dst_path) + + def get_main_file_path(): + main_file_name = os.path.basename(file_path) + if not main_file_name.lower().endswith('.vrt'): + main_file_name = pathlib.Path(main_file_name).stem + ".vrt" + if destination_path: + aggr_main_file_path = os.path.join(destination_path, main_file_name) + else: + aggr_main_file_path = main_file_name + return aggr_main_file_path + + self._validate_aggregation_for_update(resource, AggregationType.GeographicRasterAggregation) + file_path = self._validate_aggregation_path(agg_path, for_save_data=True) + # aggr_main_file_path = self.main_file_path + # data_object = self._data_object + if not as_new_aggr: + destination_path = dirname(self.main_file_path) + + # cache some of the metadata fields of the original aggregation to update the metadata of the + # updated aggregation + keywords = self.metadata.subjects + additional_meta = self.metadata.additional_metadata - if self.metadata.type == AggregationType.TimeSeriesAggregation: - return self.update_timeseries_data(resource, agg_path, as_new_aggr, destination_path) + # TODO: keep a local backup copy of the aggregation before deleting it + self.delete() + upload_raster_files(dst_path=destination_path) - if self.metadata.type == AggregationType.GeographicFeatureAggregation: - return self.update_geo_feature_data(resource, agg_path, as_new_aggr, destination_path) + # retrieve the updated aggregation + # compute the main file name + aggr_main_file_path = get_main_file_path() + aggr = resource.aggregation(file__path=aggr_main_file_path) + + # update metadata + for kw in keywords: + if kw not in aggr.metadata.subjects: + aggr.metadata.subjects.append(kw) + aggr.metadata.additional_metadata = additional_meta + aggr.save() + else: + # creating a new aggregation by uploading the updated data files + upload_raster_files(dst_path=destination_path) - # TODO: Implement this functionality for Raster aggregation + # retrieve the new aggregation + aggr_main_file_path = get_main_file_path() + agg_path = urljoin(destination_path, os.path.basename(aggr_main_file_path)) + aggr = resource.aggregation(file__path=agg_path) - raise Exception("Saving of data object is not supported for this aggregation type") + data_object = None + aggr._data_object = data_object + return aggr class Resource(Aggregation): @@ -1149,7 +1211,6 @@ def retrieve_string(self, path): def retrieve_file(self, path, save_path=""): file = self.get(path, status_code=200, allow_redirects=True) - cd = file.headers['content-disposition'] filename = urllib.parse.unquote(cd.split("filename=")[1].strip('"')) downloaded_file = os.path.join(save_path, filename) @@ -1173,7 +1234,6 @@ def retrieve_zip(self, path, save_path="", params=None): if params is None: params = {} file = self.get(path, status_code=200, allow_redirects=True, params=params) - json_response = file.json() task_id = json_response['task_id'] download_path = json_response['download_path']